运行环境若发现文章有误,敬请指教,感谢
文章目录
运行环境一、参考资料二、案例介绍&准备知识
2.1 Flume Memory Chennel2.2 Flume Avro Source
三、准备案例
3.1 配置 Flume3.2 准备本地sink目录 四、测试
4.1 启动 三个 Flume Agent4.2 观察测试结果
JDK8Hadoop3.3.0 单节点亦可Flume1.9CentOS7 一、参考资料
视频链接
Flume官方文档
Flume官方的架构图:
flume本质上就是一个Agent,通过Source(数据源)、Channel(缓冲管道)、Sink(输出源)这么一套工序,将数据从一个地方移动到另一个地方,那么本次案例则通过这样的思想,实现多个Agent的通信,最终将数据从log日志文件分流到HDFS和另一个本地文件位置。(注:这里暂未涉及拦截器)
案例结构图:
以下内容将用 $FLUME_HOME 代指flume的根目录
官方说明
在Flume里什么是Chennel ?
Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.
译:Channels 是在Agent存储的事件,Source添加事件,Sink删除事件,言简意赅就是缓冲通道
在Flume里什么是Memory Chennel ?
2.2 Flume Avro SourceThe events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures. Required properties are in bold.
译:事件以可配置的最大大小存储在内存队列中。它非常适合需要更高吞吐量的流,并且在代理失败时准备好丢失分段数据。言简意赅就是它是由Flume内部实现的一种缓冲通道。
官方说明
三、准备案例Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.
译:监听Avro端口并从外部Avro客户端流接收事件。当与另一个(上一跳)Flume代理上的内置Avro接收器配对时,它可以创建分层收集拓扑。言简意赅就是avro可通过端口号进行消息通讯,这里是通过avro作为数据源,相当于avro的客户端,同时也可以将avro作为sink,即服务端。avro可实现通过端口号进行通信
3.1 配置 Flume
$FLUME_HOME/job/group1/flume-file-flume.conf
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 默认启动 a1.sources.r1.selector.type = replicating a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/flume1.9/test/test.log a1.sources.r1.shell = /bin/bash -c a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop101 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop101 a1.sinks.k2.port = 4142 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
flume-flume-hdfs.conf
a2.sources = r1 a2.sinks = k1 a2.channels = c1 # source端的avro是一个数据接收服务 a2.sources.r1.type = avro a2.sources.r1.bind = hadoop101 a2.sources.r1.port = 4141 a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop101:8020/flumeTest/%Y%m%d/%H # 上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = flumeTest- # 是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true # 多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 # 重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour # 是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true # 最大支持多少个Event就flush数据 a2.sinks.k1.hdfs.batchSize=100 # 设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream # 多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 30 # 设置每个文件滚动大小大概为128MB a2.sinks.k1.hdfs.rollSize = 134217700 # 文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume-flume-dir.conf
a3.sources = r1 a3.sinks = k1 a3.channels = c2 a3.sources.r1.type = avro a3.sources.r1.bind = hadoop101 a3.sources.r1.port = 4142 a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/flume1.9/testSink a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 a3.sources.r1.channels = c2 a3.sinks.k1.channel = c23.2 准备本地sink目录
由于Flume本地的sink不会自动创建文件夹,所以需提前创建好,否则sink无法输出到指定位置,这里笔者保存的位置在$FLUME_HOME/testSink 可自行修改。最后flume将采集test.log日志的内容(自己随机输入即可)
$ mkdir /opt/module/flume1.9/testSink $ touch /opt/module/flume1.9/test/test.log四、测试
4.1 启动 三个 Flume Agent
启动顺序,agent 3 / agent 2 ,最后启动agent1:
以下命令均在hadoop101节点进行:
$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf & $ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf & $ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf &
根据配置文件的内容,向本地日志发送消息:
echo hello >> $FLUME_HOME/test/test.log echo haode >> $FLUME_HOME/test/test.log4.2 观察测试结果
可以通过web观察HDFS的情况: http://集群节点主机名:9870/(该地址可通过hdfs-site.xml里的dfs.namenode.http-address属性进行配置,默认为localhost:9870)
出现以下结果,说明Agent1和Agent2都运行成功了
查看本地文件
cd $FLUME_HOME/testSink ls cat 选择后缀为-1的文件
出现这个结果说明Agent1和Agent3都运行成功了,且数据一致。
至此,Flume复制多发的案例就测试完成了。



