主要包括了flume多路复用拦截器的需求分析及原理和代码api及配置的实现!
目录标题- 需求
- 需求分析及原理
- 实现步骤
使用flume采集服务器本地日志,需要按照日志类型的不同来将不同种类的日志发往不同的分析系统
需求分析及原理在开发当中,一台服务器产生的日志类型往往是有很多种的,在不同类型的日志有可能要发往不同的分析系统,在这个时候就会用到flume的channel selector中的Multiplexing结构。其原理是:根据event中header的某个key值,将不同的event发送到不同的channel中,所以我们需要自定义一个interceptor,来为不同类型的event的header中的key赋不同的值
在本demo中,以端口数据来模拟日志,以开头为字母或者数字来模拟不同类型的日志,为此需要自定义interceptor来区分数字和字母,并将其发往不同的分析系统(channel)
分析图如下:
1、创建一个maven项目,并在pox.xml中添加如下配置项
4.0.0 org.example flumeDemo 1.0-SNAPSHOT 8 8 org.apache.flume flume-ng-core 1.9.0
2、创建InterceptorDemo类,并实现Interceptor接口
完整代码如下:
package com.lqs.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
public class InterceptorDemo implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//需要配合channel选择器使用,向headers当中put对应的参数
//根据传入的数据来判断首位是数字还是字母,判断它是不是同类型的日志
byte[] body = event.getBody();
byte b = body[0];
Map headers = event.getHeaders();
if (b >= '0' && b <= '9') {
//当b为数字时
headers.put("type", "number");
} else if ((b >= 'A' && b <= 'z') || (b >= 'a' && b <= 'z')) {
//当b为字母时
headers.put("type", "letter");
}
//放回headers,这个这里也可以不写
event.setHeaders(headers);
return event;
}
@Override
public List intercept(List list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class BuilderDemo implements Builder {
@Override
public Interceptor build() {
return new InterceptorDemo();
}
@Override
public void configure(Context context) {
}
}
}
记得打包上传到flume的lib文件夹下面
3、编辑flume配置文件
在job/group2文件中创建一下新文件:
我是将包名改成了flumeDemo.jar并上传的
[lqs@bdc112 group2]$ pwd /home/lqs/module/flume-1.9.0/job/group2 [lqs@bdc112 group2]$ touch flume01 [lqs@bdc112 group2]$ touch flume02 [lqs@bdc112 group2]$ touch flume03
打开flume01文件添加如下内容
[lqs@bdc112 group2]$ vim flume01
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.selector.type = multiplexing # 使用headers中的哪些参数 a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.number = c1 a1.sources.r1.selector.mapping.letter = c2 # a1.sources.r1.selector.default = c4 # 拦截器配置 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.lqs.flume.InterceptorDemo$BuilderDemo # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = bdc112 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = bdc112 a1.sinks.k2.port = 4142 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
打开flume02文件并添加如下内容:
[lqs@bdc112 group2]$ vim flume02
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = bdc112 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
打开flume03文件并添加如下内容:
[lqs@bdc112 group2]$ vim flume03
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = bdc112 a1.sources.r1.port = 4142 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
4、分别启动flume01、flume02、flume03
#第一个窗口 [lqs@bdc112 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume03 -Dflume.root.logger=INFO,console #第二个窗口 [lqs@bdc112 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume02 -Dflume.root.logger=INFO,console #第三个窗口 [lqs@bdc112 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume01 -Dflume.root.loggeINFO,console #注意,以上三条命令表示的都是同一个意思
注意:一定要按照上述的顺序执行,否则会报错!!!
第一个窗口截图
第二个窗口截图
第三个窗口截图
注意:上面三个截图只是告诉你开三个窗口,没有其它意思。
5、在bdc112使用netcat向localhost:44444发送字母和数字
6、查看flume03和flume02的情况
flume03
flume02



