- Flume概述
- Flume定义
- Flume基础架构
- Agent
- Source
- Sink
- Channel
- Event
- Flume入门
- Flume安装部署
- 安装地址
- 安装部署
- Flume入门案例
- 监控端口数据官方案例
- 实时监控单个追加文件
- 实时监控目录下多个新文件
- 实时监控目录下的多个追加文件
- Flume进阶
- Flume事务
- Flume Agent内部原理
- Flume拓扑结构
- Flume企业开发案例
- 自定义Interceptor
- 自定义Source
- 自定义Sink
- Flume数据流监控
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单
Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS
Flume基础架构Flume组成架构 :
AgentAgent 是一个 JVM进程,它以事件的形式将数据从源头送至目的
Agent主要有3个部分组成
- Source
- Channel
- Sink
Source 是负责接收数据到 Flume Agent 的组件
Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy
SinkSink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、Hbase、solr、自定义
ChannelChannel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink运作在不同的速率上
Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作
Flume自带两种Channel
- Memory Channel
- File Channel
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据
Event传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地
Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组
Flume入门 Flume安装部署 安装地址- Flume官网地址:http://flume.apache.org/
- 文档查看地址:http://flume.apache.org/FlumeUserGuide.html
- 下载地址:http://archive.apache.org/dist/flume/
将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的 /opt/software 目录下
解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/module/ 目录下
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0
将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
mv guava-11.0.2.jar guava-11.0.2.blk
配置环境变量
vim /etc/profile.d/my_env.sh
#FLUME_HOME export FLUME_HOME=/opt/module/flume-1.9.0 export PATH=$PATH:$FLUME_HOME/bin
生效
source /etc/profileFlume入门案例 监控端口数据官方案例
案例需求:
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台
需求分析:
实现步骤:
- 安装netcat工具
sudo yum install -y nc
- 判断 44444 端口是否被占用
sudo netstat -nlp | grep 44444
-
创建 Flume Agent 配置文件 flume-netcat-logger.conf
-
在 flume 目录下创建 job 文件夹并进入 job 文件夹
mkdir job
cd job/
- 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf
vim flume-netcat-logger.conf
- 在 flume-netcat-logger.conf 文件中添加
# Name the components on this agent # a1: agent 名 # r1 : a1的 Source 名 # k1: a1的 Sink 名 # c1: a1的 Channel 名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source # a1的输入源类型为 netcat 端口类型 # a1的监听的主机 # a1的监听的端口号 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink # a1 输出目的地是控制台 logger 类型 a1.sinks.k1.type = logger # Use a channel which buffers events in memory # a1的channel 类型是 memory 内存型 # al的channel 总容量 1000 个 event # al的channel 传输时收集到了100条 event 以后提交事务 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel # 将 r1 和 c1 连接起来 # 将 k1 和 cl 连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html
- 先开启 Flume 监听端口
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数说明:
-
–conf/-c:表示配置文件存储在conf/目录
-
–name/-n:表示给agent起名为a1
-
–conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。
-
-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括 : log、info、warn、error
- 使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
- 在 Flume 监听页面观察接收数据情况
nc hadoop102 44444,flume能否接收到?
实时监控单个追加文件案例需求 :
实时监控Hive日志,并上传到HDFS中
需求分析:
实现步骤:
- Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关jar包
检查 /etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确
创建 flume-file-hdfs.conf 文件
创建文件
vim flume-file-hdfs.conf
注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件
添加如下内容
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://cpucode100:8020/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 60 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
注意:
对于所有与时间相关的转义序列,Event Header 中必须存在以 “ timestamp ” 的 key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)
a3.sinks.k3.hdfs.useLocalTimeStamp = true
- 运行Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
- 开启 Hadoop 和 Hive 并操作 Hive 产生日志
bin/hive
- 在HDFS上查看文件
案例需求:
使用Flume监听整个目录的文件,并上传至HDFS
需求分析:
实现步骤:
- 创建配置文件 flume-dir-hdfs.conf
创建一个文件
vim flume-dir-hdfs.conf
添加如下内容
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /opt/module/flume/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
- 启动监控文件夹命令
说明:在使用Spooling Directory Source时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动
- 向upload文件夹中添加文件
在 /opt/module/flume 目录下创建 upload 文件夹
向 upload 文件夹中添加文件
查看HDFS上的数据
实时监控目录下的多个追加文件Exec source 适用于监控一个实时追加的文件,不能实现断点续传;
Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
而 Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传
案例需求 :
使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
需求分析:
实现步骤:
创建配置文件flume-taildir-hdfs.conf
创建一个文件
vim flume-taildir-hdfs.conf
添加如下内容
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = TAILDIR a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json a3.sources.r3.filegroups = f1 f2 a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9.0/files/.*file.* a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*log.* # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://cpucode100:8020/flume/upload2/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
- 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
- 向files文件夹中追加内容
在 /opt/module/flume 目录下创建 files 文件夹
mkdir files
向 upload 文件夹中添加文件
echo hello >> file1.txt
echo atguigu >> file2.txt
- 查看HDFS上的数据
Taildir 说明:
Taildir Source 维护了一个 json 格式的 position File ,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传
Position File 的格式如下:
{
"inode":2496272,
"pos":12,
"file":"/opt/module/flume-1.9.0/files/file1.txt"
}
{
"inode":2496275,
"pos":12,
"file":"/opt/module/flume-1.9.0/files/file2.txt"
}
注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用inode 号码来识别文件
Flume进阶 Flume事务
Put事务流程
-
doPut : 将批数据先写入临时缓冲区 putList
-
doComm ne 存队列是否足够合并。
-
doRollback : channel不足, 回滚数据
Take事务
•doTake:将数据取到临时缓冲区takeList, 并将数据发送到HDFS
•doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
•doRollback:数据发送过程中如果出现异常, rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列。
重要组件:
1)ChannelSelector
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
2)SinkProcessor
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。
这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。
Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。
Flume企业开发案例 自定义Interceptor 自定义Source 自定义Sink Flume数据流监控


