栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume 学习笔记(二)Flume复制多发案例 | 配置多个Channel对于相对不同的单个Sink | 将本地日志文件内容采集到HDFS和另一个本地位置

Flume 学习笔记(二)Flume复制多发案例 | 配置多个Channel对于相对不同的单个Sink | 将本地日志文件内容采集到HDFS和另一个本地位置

若发现文章有误,敬请指教,感谢

文章目录

运行环境一、参考资料二、案例介绍&准备知识

2.1 Flume Memory Chennel2.2 Flume Avro Source

三、准备案例

3.1 配置 Flume3.2 准备本地sink目录 四、测试

4.1 启动 三个 Flume Agent4.2 观察测试结果

运行环境

JDK8Hadoop3.3.0 单节点亦可Flume1.9CentOS7 一、参考资料


视频链接
Flume官方文档

二、案例介绍&准备知识

Flume官方的架构图:

flume本质上就是一个Agent,通过Source(数据源)、Channel(缓冲管道)、Sink(输出源)这么一套工序,将数据从一个地方移动到另一个地方,那么本次案例则通过这样的思想,实现多个Agent的通信,最终将数据从log日志文件分流到HDFS和另一个本地文件位置。(注:这里暂未涉及拦截器)
案例结构图:

以下内容将用 $FLUME_HOME 代指flume的根目录

2.1 Flume Memory Chennel

官方说明

在Flume里什么是Chennel ?

Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.
译:Channels 是在Agent存储的事件,Source添加事件,Sink删除事件,言简意赅就是缓冲通道

在Flume里什么是Memory Chennel ?

The events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures. Required properties are in bold.

译:事件以可配置的最大大小存储在内存队列中。它非常适合需要更高吞吐量的流,并且在代理失败时准备好丢失分段数据。言简意赅就是它是由Flume内部实现的一种缓冲通道。

2.2 Flume Avro Source

官方说明

Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.
译:监听Avro端口并从外部Avro客户端流接收事件。当与另一个(上一跳)Flume代理上的内置Avro接收器配对时,它可以创建分层收集拓扑。言简意赅就是avro可通过端口号进行消息通讯,这里是通过avro作为数据源,相当于avro的客户端,同时也可以将avro作为sink,即服务端。avro可实现通过端口号进行通信

三、准备案例
3.1 配置 Flume

$FLUME_HOME/job/group1/flume-file-flume.conf

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 默认启动
a1.sources.r1.selector.type = replicating

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume1.9/test/test.log
a1.sources.r1.shell = /bin/bash -c

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop101
a1.sinks.k2.port = 4142

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume-flume-hdfs.conf

a2.sources = r1
a2.sinks = k1
a2.channels = c1


# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop101
a2.sources.r1.port = 4141

a2.sinks.k1.type = hdfs

a2.sinks.k1.hdfs.path = hdfs://hadoop101:8020/flumeTest/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flumeTest-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 最大支持多少个Event就flush数据
a2.sinks.k1.hdfs.batchSize=100
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件滚动大小大概为128MB
a2.sinks.k1.hdfs.rollSize = 134217700

# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume-flume-dir.conf

a3.sources = r1
a3.sinks = k1
a3.channels = c2


a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop101
a3.sources.r1.port = 4142

a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume1.9/testSink

a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
3.2 准备本地sink目录

由于Flume本地的sink不会自动创建文件夹,所以需提前创建好,否则sink无法输出到指定位置,这里笔者保存的位置在$FLUME_HOME/testSink 可自行修改。最后flume将采集test.log日志的内容(自己随机输入即可)

$ mkdir /opt/module/flume1.9/testSink

$ touch /opt/module/flume1.9/test/test.log
四、测试
4.1 启动 三个 Flume Agent

启动顺序,agent 3 / agent 2 ,最后启动agent1:
以下命令均在hadoop101节点进行:

$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf &

$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf &

$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf &

根据配置文件的内容,向本地日志发送消息:

echo hello >> $FLUME_HOME/test/test.log
echo haode >> $FLUME_HOME/test/test.log
4.2 观察测试结果

可以通过web观察HDFS的情况: http://集群节点主机名:9870/(该地址可通过hdfs-site.xml里的dfs.namenode.http-address属性进行配置,默认为localhost:9870)

出现以下结果,说明Agent1和Agent2都运行成功了

查看本地文件

cd $FLUME_HOME/testSink
ls
cat 选择后缀为-1的文件



出现这个结果说明Agent1和Agent3都运行成功了,且数据一致。

至此,Flume复制多发的案例就测试完成了。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775697.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号