我们经常一提到Flume,就要想到Flume的三个部件,Source,Channel,Sink.这三个部件构成了Flume数据传输的通道.
在我们这个项目当中,收集Flume布置在两台节点上,一台Flume收集事件类型日志数据,一台Flume收集启动类型日志数据.
其中,Flume的Source采用的是TailDirSource(Flume1.6版本当中,没有该Source,下一个版本中才有,也就是Flume1.7的版本中才有taildirsource),TailDirSource的优点就是支持断点续传,每次写入数据的时候就记录当前所写到的位置,这个位置在配置Flume的时候可以去定义他的位置.
Channel有MemoryChannel和FileChannel,其中的MemoryChannel优点就是基于内存速度快,但是一断电的话,数据就会消失.FILEChannel的优点就是速度慢,但是其安全性能高.Sink采用的是KAFKASINK,如果SINK的出端就是KAFKA,采用KAFKASINK可以节省时间.
在flume的安装目录,找到conf目录,在该目录下:vim file-flume-kafka.conf:
a1.sources=r1 #组件的定义.source定义1个,为r1 a1.channels=c1 c2 #channel定义两个,c1,c2 a1.sinks=k1 k2 #sink定义2个,k1,k2 # configure source a1.sources.r1.type = TAILDIR #定义source的类型:taildir(断点续传) a1.sources.r1.positionFile = /opt/module/flume/log_position.json #定义每一步读取位置的记录存放 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ #source所要读取的目标文件,以app开头 a1.sources.r1.fileHeader = true #支持拦截 a1.sources.r1.channels = c1 c2 #所连接的channel #interceptor a1.sources.r1.interceptors = i1 i2 #定义两个拦截器 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder #ETL拦截器(对数据进行清洗) a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder #日志类型拦截器 # selector a1.sources.r1.selector.type = multiplexing #定义选择器 a1.sources.r1.selector.header = logType #根据日志类型分数据 a1.sources.r1.selector.mapping.start = c1 #日志类型是start,数据就发往channel1 a1.sources.r1.selector.mapping.event = c2 #日志类型是event,数据发往channel2 # configure channel a1.channels.c1.type = memory #c1定义成memorychannel a1.channels.c1.capacity=10000 a1.channels.c1.byteCapacityBufferPercentage=20 a1.channels.c2.type = memory a1.channels.c2.capacity=10000 a1.channels.c2.byteCapacityBufferPercentage=20 # configure sink # start-sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = topic_start a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.flumeBatchSize = 2000 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.channel = c1 # event-sink a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.kafka.topic = topic_event a1.sinks.k2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k2.kafka.flumeBatchSize = 2000 a1.sinks.k2.kafka.producer.acks = 1 a1.sinks.k2.channel = c2



