目录
一、多路复用及拦截器的使用
二、复制
三、聚合
一、多路复用及拦截器的使用
需求:使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。(区分数字和字母,将其发往不同的系统)
一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。
1.自定义拦截器
package com.hpu.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; public class MyInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { byte b = event.getBody()[0]; if (b >= '0' && b <= '9'){ event.getHeaders().put("type","number"); } if ((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z')){ event.getHeaders().put("type","letter"); } return event; } @Override public List org.apache.flume flume-ng-core1.9.0 intercept(List events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } public static class MyBuilder implements Builder{ //创建拦截器 @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } } }
2.根据图配置文件
创建新文件conf/group2/flume1,创建新文件conf/group2/flume2,创建新文件conf/group2/flume3。
为Flume1配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。
为Flume1配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.selector.type = multiplexing
# 使用headers中的哪些参数
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.number = c1
a1.sources.r1.selector.mapping.letter = c2
# a1.sources.r1.selector.default = c4
# 拦截器配置
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hpu.flume.MyInterceptor$MyBuilder
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
为Flume2配置一个avro source和一个logger sink。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
为Flume3配置一个avro source和一个logger sink。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
3.启动加运行
bin/flume-ng agent --conf conf/ --name al --conf-file conf/group2/flume3.conf bin/flume-ng agent --conf conf/ --name al --conf-file conf/group2/flume2.conf bin/flume-ng agent --conf conf/ --name al --conf-file conf/group2/flume1.conf
4.在hadoop102使用netcat向localhost:44444发送字母和数字,观察flume2和flume3打印的日志。
二、复制
需求:使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。
实现图:
1.创建文件夹
在/opt/module/flume/conf目录下创建group1文件夹 [example@hadoop102 conf]$ mkdir group1/ 在/opt/module/flume/目录下创建flume3datas文件夹 [example@hadoop102 flume]$ mkdir flume3datas
2.编辑配置文件
创建flume1.conf 配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume2和flume3。 vim flume1.conf 添加如下内容 # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume/files1/.*file.* a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*log.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # 将数据流复制给所有channel 默认参数可以不写 a1.sources.r1.selector.type = replicating # Describe the sink # sink端的avro是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 创建flume2.conf 配置上级Flume输出的Source,输出是到HDFS的Sink。 vim flume2.conf 添加如下内容 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume1/%Y%m%d/%H # 文件的前缀 a1.sinks.k1.hdfs.filePrefix = log- #多久生成一个新的文件 a1.sinks.k1.hdfs.rollInterval = 30 #设置每个文件的滚动大小大概是128M a1.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a1.sinks.k1.hdfs.rollCount = 0 # 使用本地的时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #设置文件类型,可支持压缩 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 创建flume3.conf 配置上级Flume输出的Source,输出是到本地目录的Sink。 vim flume3.conf 添加如下内容 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 4142 # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /opt/module/flume/flume3datas # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3.启动与运行
bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group1/flume3.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group1/flume2.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group1/flume1.conf 在检查文件输入 [example@hadoop102 files1]$ echo hello >> file1.txt 分别去hdfs和本地查看结果
三、聚合
需求:
hadoop102上的Flume-1监控文件/opt/module/flume/files1/.*file.*,
hadoop103上的Flume-2监控某一个端口的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
1.准备工作:
分发Flume xsync flume 在hadoop102、hadoop103以及hadoop104的/opt/module/flume/conf目录下创建一个group3文件夹。 mkdir group3 mkdir group3 mkdir group3
2.编辑配置文件
创建flume1.conf 配置Source用于监控本地文件,配置Sink输出数据到下一级Flume。 在hadoop102上编辑配置文件 vim flume1.conf 添加如下内容 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume/files1/.*file.* a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*log.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 创建flume2.conf 配置Source监控端口44444数据流,配置Sink数据到下一级Flume: 在hadoop103上编辑配置文件 vim flume2.conf 添加如下内容 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 创建flume3.conf 配置source用于接收flume1与flume2发送过来的数据流,最终合并后sink到控制台。 在hadoop104上编辑配置文件 vim flume3.conf 添加如下内容 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop104 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3.启动与运行
bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume1.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume2.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume3.conf 在hadoop102上向/opt/module/flume目录下的group.log追加内容 [example@hadoop102 files1]$ echo 'hello' > file1 在hadoop103上向44444端口发送数据 [example@hadoop103 flume]$ nc hadoop103 44444 检查hadoop104上数据



