主要是用来保证数据的一致性,要么都成功,要么都失败。
事务原理图
Flume Agent内部原理总结下:就是说Source中采集到的event并不是直接到channel中的,而是首先会经过一个ChannelProcessor,这个processor会让我们的event去走了拦截器链,随后processor又将经过拦截器链后的event送到ChannelSelector,selector有两种,分别是Replicating Channel Selector和Multiplexing Channel Selector,前者会将event往每个channel中都发一份,而后者是需要经过配置判断往哪些channel中发送。来到channel中后,又会经过一个SinkProcessor,sinkProcessor有三种:defaultSinkProcessor, LoadBalancingSinkProcessor,FailoverSinkProcessor,第一种针对只有一个channel的情况,第二种是负载均衡的,会根据某种算法往不同的channel传输event,第三种则是故障转移高可用的。
原理图
Flume拓扑结构简单串联多个flume一起使用
在这种串联结构下,要求A为客户端,B为服务端,并且A的sink为AVRO,B的source为AVRO,
启动时先启动服务端B
这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。
复制和多路复用复制:一份数据复制到不同的channel中,最后传到hdfs,kafka,其他的flume。
多路复用:一份数据分离,分到不用的channel中,其中一部分给到hdfs,一分部给到kafka等。这个可以通过多路选择器multiplexing channel selector实现
Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。
负载均衡和故障转移Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。
聚合这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。
Flume企业开发案例 复制和多路复用案例复制
需求: 使用Flume-1监控文件变动,
Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。
Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。
架构图
准备工作
# 在/opt/module/flume-1.9.0/jobs目录创建replicating文件夹,用来存放flume1,2,3的配置 [atguigu@hadoop102 jobs]$ pwd /opt/module/flume-1.9.0/jobs [atguigu@hadoop102 jobs]$ mkdir replicating [atguigu@hadoop102 jobs]$ cd replicating/ [atguigu@hadoop102 replicating]$ ls
对照结构,编写flume3.conf配置
# 处理数据,写出到本地文件系统 # Name a3.sources = r1 a3.sinks = k1 a3.channels = c1 # source,需要和上一个agent对接,使用avro a3.sources.r1.type = avro a3.sources.r1.bind = localhost a3.sources.r1.port = 8888 # sink a3.sinks.k1.type = file_roll # 记得创建fileroll文件夹 a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll # channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
对照结构,编写flume2.conf配置
# 处理数据,写出到hdfs # Name a2.sources = r1 a2.sinks = k1 a2.channels = c1 # source,需要和上一个agent对接,使用avro a2.sources.r1.type = avro a2.sources.r1.bind = localhost a2.sources.r1.port = 7777 # sink a2.sinks.k1.type = hdfs # 注意,这里的path是hdfs中core-site.xml中的fs.defaultFS的配置 a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件,单位是秒 a2.sinks.k1.hdfs.rollInterval = 60 #设置每个文件的滚动大小128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0 # channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
对照结构,编写flume1.conf配置
# 这个是上游的agent,需要对接下游两个agent flume2,flume3 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 指定source配置 a1.sources.r1.type = TAILDIR # 可以设置多个组 a1.sources.r1.filegroups = f1 # 监控的目录 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*.txt # 断点续传(json格式) a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json # channel Selector 配置 a1.sources.r1.selector.type = replicating # 指定sink配置(两个sink) a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 7777 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 8888 # 指定channel配置(两个channel) a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 100 # 指定source,sink,channel之间的绑定关系 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
执行flume命令
# flume3 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console # flume2 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console # flume1 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console
往/opt/module/flume-1.9.0/jobs/taildir/.*.txt追加内容
[atguigu@hadoop102 taildir]$ pwd /opt/module/flume-1.9.0/jobs/taildir [atguigu@hadoop102 taildir]$ ll 总用量 16 -rw-rw-r--. 1 atguigu atguigu 12 10月 10 19:29 file1.txt -rw-rw-r--. 1 atguigu atguigu 13 10月 10 19:30 file2.txt [atguigu@hadoop102 taildir]$ echo hello >> file1.txt [atguigu@hadoop102 taildir]$ echo pihao >> file2.txt [atguigu@hadoop102 taildir]$
查看hdfs中的内容
查看本地系统的内容
[atguigu@hadoop102 fileroll]$ pwd /opt/module/flume-1.9.0/jobs/fileroll [atguigu@hadoop102 fileroll]$ [atguigu@hadoop102 fileroll]$ ll 总用量 8 # 这个本地有点奇怪,它每隔30s就会生成一个文件,无论是否有新增内容 -rw-rw-r--. 1 atguigu atguigu 0 10月 10 23:52 1633881154435-1 -rw-rw-r--. 1 atguigu atguigu 0 10月 10 23:53 1633881154435-2 -rw-rw-r--. 1 atguigu atguigu 6 10月 10 23:55 1633881154435-3 -rw-rw-r--. 1 atguigu atguigu 6 10月 10 23:55 1633881154435-4 [atguigu@hadoop102 fileroll]$ cat 1633881154435-3 hello [atguigu@hadoop102 fileroll]$ cat 1633881154435-4 pihao
负载均衡和故障转移案例测试完成,现在一份数据被复制传输到了不同的目的地,复制测试OK!
需求:使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能
先做负载均衡
使用Flume1监控一个端口,将监控的内容以轮询或者随机的方式发送到flume2和flume3 flume2将数据输出到控制台 flume3将数据输出到控制台
架构图
flume3.conf
# 处理数据,写出到本地文件系统 # Name a3.sources = r1 a3.sinks = k1 a3.channels = c1 # source,需要和上一个agent对接,使用avro a3.sources.r1.type = avro a3.sources.r1.bind = localhost a3.sources.r1.port = 8888 # sink a3.sinks.k1.type = logger # channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
flume2.conf
# 处理数据,写出到hdfs # Name a2.sources = r1 a2.sinks = k1 a2.channels = c1 # source,需要和上一个agent对接,使用avro a2.sources.r1.type = avro a2.sources.r1.bind = localhost a2.sources.r1.port = 7777 # sink a2.sinks.k1.type = logger # channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume1.conf
# 这个是上游的agent,需要对接下游两个agent flume2,flume3 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 # 指定source配置 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 6666 # channel Selector 配置 a1.sources.r1.selector.type = replicating # 指定sink配置(两个sink) a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 7777 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 8888 # sink processor a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance # 指定selector为轮询或者随机 round_robin/random a1.sinkgroups.g1.processor.selector = round_robin # 指定channel配置(两个channel) a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # 指定source,sink,channel之间的绑定关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
分别创建上面的配置文件
[atguigu@hadoop102 jobs]$ pwd /opt/module/flume-1.9.0/jobs [atguigu@hadoop102 jobs]$ mkdir loadbalancing [atguigu@hadoop102 jobs]$ cd loadbalancing/ [atguigu@hadoop102 loadbalancing]$ vim flume1.conf [atguigu@hadoop102 loadbalancing]$ vim flume2.conf [atguigu@hadoop102 loadbalancing]$ vim flume3.conf [atguigu@hadoop102 loadbalancing]$ ll 总用量 12 -rw-rw-r--. 1 atguigu atguigu 992 10月 11 20:32 flume1.conf -rw-rw-r--. 1 atguigu atguigu 430 10月 11 20:32 flume2.conf -rw-rw-r--. 1 atguigu atguigu 443 10月 11 20:32 flume3.conf [atguigu@hadoop102 loadbalancing]$
执行flume命令
# flume3 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console # flume2 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console # flume1 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console
测试发送数据
[atguigu@hadoop102 loadbalancing]$ nc localhost 6666 hello OK pihao OK flume OK
测试成功,发现每次只往一个sink中传数据,并且实现轮询。注意:这里的轮询并不是指单个event的轮询。而是指的是sink往channel中去抓取数据是轮询的,有可能这次flume2获取到数据,然后flume3去抓取,结果channel中没有数据了。下次有是flume2是获取,有获取到数据了,对吧
测试random
只需要把flume1.conf中的sink processor改为:
a1.sinkgroups.g1.processor.selector = random
测试ok,至此,负载均衡均衡的 轮询和随机模式测试完毕,接下来测试一下故障转移
故障转移测试
flume1监控端口数据,将监控的内容发送到active的flume,待这个active的flume宕机后自动转移到其他的flume,实现高可用
架构图
这里例子其实和上面负载均衡的例子的架构差不多,唯一的区别就是sink processor,上面的例子是load_balancing sink processor,而这个例子是failover sink processor
配置flume1.conf
# 这个是上游的agent,需要对接下游两个agent flume2,flume3 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 # 指定source配置 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 6666 # channel Selector 配置 a1.sources.r1.selector.type = replicating # 指定sink配置(两个sink) a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 7777 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 8888 # sink processor a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 # 配置故障转移的sink processor a1.sinkgroups.g1.processor.type = failover # 配置优先级(看哪个sink成为active) a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 # 指定channel配置(两个channel) a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # 指定source,sink,channel之间的绑定关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
分别创建flume配置文件
[atguigu@hadoop102 jobs]$ pwd /opt/module/flume-1.9.0/jobs [atguigu@hadoop102 jobs]$ mkdir failover [atguigu@hadoop102 jobs]$ cd failover/ [atguigu@hadoop102 failover]$ vim flume1.conf [atguigu@hadoop102 failover]$ vim flume2.conf [atguigu@hadoop102 failover]$ vim flume3.conf [atguigu@hadoop102 failover]$ ll 总用量 12 -rw-rw-r--. 1 atguigu atguigu 1054 10月 11 21:06 flume1.conf -rw-rw-r--. 1 atguigu atguigu 430 10月 11 21:06 flume2.conf -rw-rw-r--. 1 atguigu atguigu 443 10月 11 21:06 flume3.conf [atguigu@hadoop102 failover]$
执行flume命令
# flume3 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console # flume2 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console # flume1 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console
测试发送数据
[atguigu@hadoop102 failover]$ nc localhost 6666 hello OK pihao OK nihoaya OK haha OK hhe OK
聚合案例现在手动将flume3关闭,然后再次发送数据,发现现在的数据给到flume2的,实现故障转移。
最后我又重启启动flume3,连接成功后再次发送数据,发现数据又重新给到flume3了
需求:
hadoop102上的Flume1监控文件/opt/module/flume-1.9.0/jobs/taildir/.txt,
hadoop103上的Flume2监控一个端口6666的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume3,Flume3将最终数据打印到控制台。
架构图
编写flume1.conf配置
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*.txt a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 8888 #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
编写flume2.conf配置
a2.sources = r1 a2.channels = c1 a2.sinks = k1 #Source a2.sources.r1.type = netcat a2.sources.r1.bind = localhost a2.sources.r1.port = 6666 #Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 #Sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop104 a2.sinks.k1.port = 8888 #Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
编写flume3.conf配置
#Named a3.sources = r1 a3.channels = c1 a3.sinks = k1 #Source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop104 a3.sources.r1.port = 8888 #Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 #Sink a3.sinks.k1.type = logger #Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
分别创建好配置文件
[atguigu@hadoop102 jobs]$ pwd /opt/module/flume-1.9.0/jobs [atguigu@hadoop102 jobs]$ mkdir aggregate [atguigu@hadoop102 jobs]$ cd aggregate/ [atguigu@hadoop102 aggregate]$ vim flume1.conf [atguigu@hadoop102 aggregate]$ vim flume2.conf [atguigu@hadoop102 aggregate]$ vim flume3.conf [atguigu@hadoop102 aggregate]$ ll 总用量 12 -rw-rw-r--. 1 atguigu atguigu 538 10月 11 21:35 flume1.conf -rw-rw-r--. 1 atguigu atguigu 404 10月 11 21:35 flume2.conf -rw-rw-r--. 1 atguigu atguigu 354 10月 11 21:35 flume3.conf [atguigu@hadoop102 aggregate]$
注意现在配置文件已经创建完毕,但是hadoop103和hadoop104机器上是没有flume的,需要分发一下以及flume的环境变量的配置也要分发。
#在hadoop102执行 [atguigu@hadoop102 module]$ pwd /opt/module [atguigu@hadoop102 module]$ my_rsync.sh flume-1.9.0/ [atguigu@hadoop102 module]$ scp -r /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/ [atguigu@hadoop102 module]$ scp -r /etc/profile.d/my_env.sh root@hadoop104:/etc/profile.d/ # 分别在103,104测试环境变量有没有生效
执行flume命令
# 分别在各个机器按如下顺序启动 # flume3 (104) 下游服务端最先启动 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregate/flume3.conf -n a3 -Dflume.root.logger=INFO,console # flume1 (102) flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregate/flume1.conf -n a1 -Dflume.root.logger=INFO,console # flume2 (103) flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregate/flume2.conf -n a2 -Dflume.root.logger=INFO,console
三台启动成功!
测试
# 在102测试文件追加 [atguigu@hadoop102 taildir]$ pwd /opt/module/flume-1.9.0/jobs/taildir [atguigu@hadoop102 taildir]$ ll 总用量 16 -rw-rw-r--. 1 atguigu atguigu 18 10月 10 23:54 file1.txt -rw-rw-r--. 1 atguigu atguigu 19 10月 10 23:55 file2.txt -rw-rw-r--. 1 atguigu atguigu 11 10月 10 19:30 log1.log -rw-rw-r--. 1 atguigu atguigu 11 10月 10 19:30 log2.log [atguigu@hadoop102 taildir]$ echo hello >> file1.txt [atguigu@hadoop102 taildir]$ echo pihao >> file2.txt [atguigu@hadoop102 taildir]$ # 在103测试端口发送内容 [atguigu@hadoop103 ~]$ nc localhost 6666 hello2 OK pihao2 OK # 查看104控制台的输出结果
自定义Interceptorok,聚合的案例测试完成
需求: 试想这么一个场景,就是我采集的数据里面有多种信息,其中包括bigdata,java,那么我想把包含bigdata的包含java的数据分发,分别运输到不同的目的地。怎么实现呢?
分析
# 可以使用多路选择器,将日志发送到想指定的sink中。先瞄一眼官网的多路选择器配置: a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4 # 大致的意思是说从event中获取header然后,判断state这个字段的值,然后分发到指定的channel.实现自定义配置channel 那么再想一下,这个event在进入这个channel之前,我们需要为这个event设置header头。再哪里设置呢?那就是在interceptor拦截器中可以设置。编写java工程
package com.pihao.flume.interceptor;
import com.nimbusds.jose.util.StandardCharset;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
public class EventHeaderInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 获取header
Map headers = event.getHeaders();
// 获取body
String body = new String(event.getBody(), StandardCharset.UTF_8);
//判断body中是否包含bigdata,java
if(body.contains("bigdata")){
headers.put("whichChannel","bigdata");
}else if(body.contains("java")){
headers.put("whichChannel","java");
}else{
// headers.put("whichChannel","other"); 这个可以不写,在多路选择器中有一个default选项。就是这种情况
}
return event;
}
@Override
public List intercept(List events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
public static class MyBuilder implements Builder{
@Override
public Interceptor build() {
return new EventHeaderInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
# 然后打包发送到102的flume的lib目录下 [atguigu@hadoop102 lib]$ ls|grep pihao pihao_event_interceptor.jar [atguigu@hadoop102 lib]$编写配置文件
多路选择器案例 flume1监控端口的数据,将监控的数据发送给flume2,flume3,flume4.包含bigdata,java,其他发给flume4 flume2,flume3,flume4直接输出
架构图
编写配置文件
#flume1.conf #Named a1.sources = r1 a1.channels = c1 c2 c3 a1.sinks = k1 k2 k3 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 5555 #channel selector a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = whichChannel a1.sources.r1.selector.mapping.bigdata = c1 a1.sources.r1.selector.mapping.java = c2 a1.sources.r1.selector.default = c3 # Interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.pihao.flume.interceptor.EventHeaderInterceptor$MyBuilder #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 100 a1.channels.c3.type = memory a1.channels.c3.capacity = 10000 a1.channels.c3.transactionCapacity = 100 #Sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 6666 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 7777 a1.sinks.k3.type = avro a1.sinks.k3.hostname = localhost a1.sinks.k3.port = 8888 #Bind a1.sources.r1.channels = c1 c2 c3 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3 #flume2.conf a2.sources = r1 a2.channels = c1 a2.sinks = k1 #Source a2.sources.r1.type = avro a2.sources.r1.bind = localhost a2.sources.r1.port = 6666 #Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 #Sink a2.sinks.k1.type = logger #Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 #flume3.conf #Named a3.sources = r1 a3.channels = c1 a3.sinks = k1 #Source a3.sources.r1.type = avro a3.sources.r1.bind = localhost a3.sources.r1.port = 7777 #Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 #Sink a3.sinks.k1.type = logger #Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 #flume4.conf #Named a4.sources = r1 a4.channels = c1 a4.sinks = k1 #Source a4.sources.r1.type = avro a4.sources.r1.bind = localhost a4.sources.r1.port = 8888 #Channel a4.channels.c1.type = memory a4.channels.c1.capacity = 10000 a4.channels.c1.transactionCapacity = 100 #Sink a4.sinks.k1.type = logger #Bind a4.sources.r1.channels = c1 a4.sinks.k1.channel = c1 启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume4.conf -n a4 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume3.conf -n a3 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume2.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume1.conf -n a1 -Dflume.root.logger=INFO,console
ok,启动成功,现在开始往5555端口发送数据,看看能不能实现数据分开 bigdata,java
[atguigu@hadoop102 multi]$ nc localhost 5555 bigdata # flume2 OK java # flume3 OK other # flume4 OK
自定义Source,自定义Sink多路案例测试成功
Flume数据流监控官网已经定义好了大部分的Source以及Sink,这部分内容不重要
企业真实面试题使用Ganglia监控,自己去了解安装下
你是如何实现Flume数据传输的监控的? 使用第三方框架Ganglia实时监控Flume。 Flume的Source,Sink,Channel的作用?你们Source是什么类型? 1)作用 (1)Source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy (2)Channel组件对采集到的数据进行缓存,可以存放在Memory或File中。 (3)Sink组件是用于把数据发送到目的地的组件,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。 2)我公司采用的Source类型为: (1)监控后台日志:exec (2)监控后台产生日志的端口:netcat
Flume的Channel Selectors?
Flume参数调优? 1)Source 增加Source个(使用Tair Dir Source时可增加FileGroups个数)可以增大Source的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个Source 以保证Source有足够的能力获取到新产生的数据。 batchSize参数决定Source一次批量运输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能。 2)Channel type 选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据。type选择file时Channel的容错性更好,但是性能上会比memory channel差。 使用file Channel时dataDirs配置多个不同盘下的目录可以提高性能。 Capacity 参数决定Channel可容纳最大的event条数。transactionCapacity 参数决定每次Source往channel里面写的最大event条数和每次Sink从channel里面读的最大event条数。transactionCapacity需要大于Source和Sink的batchSize参数。 3)Sink 增加Sink的个数可以增加Sink消费event的能力。Sink也不是越多越好够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费。 batchSize参数决定Sink一次批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬出event的性能。
Flume的事务机制? Flume的事务机制(类似数据库的事务机制):Flume使用两个独立的事务分别负责从Soucrce到Channel,以及从Channel到Sink的事件传递。比如spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到Channel且提交成功,那么Soucrce就将该文件标记为完成。同理,事务以类似的方式处理从Channel到Sink的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到Channel中,等待重新传递。



