- Flume官网
- Flume概述
- Flume架构
- 1. Agent
- 2. Source
- 3. Sink
- 4. Channel
- 5. Event
- 6. ChannalSelector
- 7.Interceptor 拦截器
- 8.SinkProcessor
- Flume安装
- 1.安装地址
- 2.安装部署
- Flume案例
- 1.实时监听端口数据
- 2.实时监控单个追加写入的文件到HDFS
- 2.1 exec source 和 HDFS sink
- 3. 实时监控目录下多个新文件 Spooldir source
- 4.实时监控多目录下的多个追加写入的文件
- `Tail dirsource`
- `taildir source 源码修改`
- 实操
- Flume进阶
- Flume事务
- (1) put事务
- (2) take事务
- Flume 内部数据处理流程
- Flume拓扑结构
- 1.简单串联
- 2.复制和多路复用(1 souce多channel)
- 3.负载均衡和故障转移(1channel 多sink)
- 4.聚合(多source,1sink 最常见)
- Flume 案例2
- 1.复制
- 2.故障转移
- 3.负载均衡
- 4 聚合
- 5.自定义Interceptor
(1)Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/
(1)Flume是cdh公司提供的
(2)海量日志采集、聚合和传输的框架
(3)高可用的,高可靠的,分布式
- 高可用:flume挂了以后还有别的flume可以代替来工作
- 高可靠:数据传输可靠,不丢失
- 分布式的意思是flume可以部署在多台日志服务器上做数据采集然后聚集到一起的意思,其实很flume本身是单体应用,不是分布式集群
(4)实时、批处理数据
Flume在大数据场景的使用
(1) Agent是一个JVM进程,启动Flume就是启动Agent进程,在Linux中显示为Application进程
(2) 它将数据封装成事件(event)的形式将数据从源头送至目的
(3) Agent主要有3个部分组成,Source、Channel、Sink
(1) Source是负责采集数据
(2) Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy、自定义
Sink干两件事
(1) Sink不断地轮询Channel中的event且批量地移除它们(拉取)
(2) 将这些event批量写入到存储系统,或者另一个Flume Agent。
(3) Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、Hbase、solr、自定义。
(1) Channel是位于Source和Sink之间的缓冲区。
其实Channel没有也可以,Channel的主要作用就是提供缓冲,因此,Channel允许Source和Sink运作在不同的速率上。
(2)Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
(3)Flume自带两种Channel:Memory Channel和File Channel。
Memory Channel是内存中的队列,Memory Channel在不需要关心数据丢失的情景下适用。程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel将所有事件写到磁盘,不会丢失数据。
(1)event是Flume框架中数据的传输单元。
(2)event由Header和Body两部分组成。
- Header用来存放该event的一些属性,为K-V结构,默认为空。
- Body用来存放该条数据,形式为字节数组。
(3)最后HDFS上存储的只有数据,没有Header。
6. ChannalSelector(1) 应用场景:一个source后面接多个channel
(2) Flume自带的两种选择器
1.Replicating Channel Selector: 复制选择器 ,默认的
是将source传进来的event从每个channel都有
2.MultiPlexing Channel Selector:多路复用选择器
根据event的header来分配不同的channel
(3)工作时间点:Source到Channel之间
7.Interceptor 拦截器给event的header设置key-value值,该key-value值决定该event去哪个channel,所以要配合MultiPlexing Channel Selector使用
作用在source读取数据后,channel selector之前
8.SinkProcessor应用在一个channel后面接多个sink的场景,用于决定将channel中的哪些event给哪个sink
Flume安装 1.安装地址(1)Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/
(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
[atguigu@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
(3)修改apache-flume-1.9.0-bin的名称为flume
[atguigu@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
[atguigu@hadoop102 lib]$ rm /opt/module/flume/lib/guava-11.0.2.jarFlume案例 1.实时监听端口数据
需求:使用Flume监听一个端口,收集该端口数据,并打印到控制台
(1) 编写agent配置文件 flume-netcat-logger.conf
(2)开启flume agent进程
第一种写法:
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
第二种写法:
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数说明:
--conf/-c:表示配置文件存储在conf/目录
--name/-n:对应配置文件中给agent起名的名字为a1
--conf-file/-f:flume本次启动读取的配置文件所在目录及文件名
-Dflume.root.logger=INFO,console:-D表示动态修改flume运行时的配置
flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。
日志级别包括:log、info、warn、error。
(3) 测试
使用linux中netcat工具向本机的44444端口发送内容
[atguigu@hadoop102 ~]$ nc localhost 44444 hello atguigu
在控制台读取的数据:
可以看出是用Event封装了该条消息,并且header为空map,body为byte数组
补充知识:localhost和域名以及0.0.0.0的却别
-
每台主机的localhost都是指本地地址,该地址外部无法访问,仅本机内进程可以访问。
-
hadoop102是公共IP,本机内进程可以访问,别的主机上的进程也能访问
-
本机进程访问localhost不需要网络带宽,走hadoop102需要带宽。
-
nc -l 0.0.0.0 4444 等价于 nc hadoop102 4444 和 nc localhost 4444都能访问
需求:实时监控Hive日志,并上传到HDFS中
(1)Flume要想将数据输出到HDFS,依赖Hadoop相关jar包
- 检查/etc/profile.d/my_env.sh文件,确认Hadoop和Java环境变量配置正确
JAVA_HOME=/opt/module/jdk1.8.0_212 HADOOP_HOME=/opt/module/ha/hadoop-3.1.3 PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH JAVA_HOME HADOOP_HOME2.1 exec source 和 HDFS sink
(2)编写agent配置文件 flume-file-hdfs.conf
[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf
注:要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令。由于Hive日志在Linux系统中所以读取文件的类型选择:exec即execute执行的意思。表示执行Linux命令来读取文件
tail -f 命令默认从文件末尾第十行开始读取,因此不支持断点续传;
添加如下内容:
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log # Describe the sink a2.sinks.k2.type = hdfs # 设置采集的数据存放在hdfs上的目录 a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H #HDFS中存储采集到的数据的文件名前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹相关配置: a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 一定改为true,因为文件的滚动是以时间戳来命名的 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 批次写入HDFS a2.sinks.k2.hdfs.batchSize = 100 #设置文件类型,DataStream是不支持压缩 a2.sinks.k2.hdfs.fileType = DataStream # 多久生成一个新的文件 单位s a2.sinks.k2.hdfs.rollInterval = 60 #文件多大开始滚动 单位 字节 a2.sinks.k2.hdfs.rollSize = 134217700 #文件中存储多少个event开始滚动 设置为0 不启用,因为event的大小不固定 a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
- hdfs sink 说明
(1) hdfs.useLocalTimeStamp
在hdfs sink的配置中,所有与时间相关的转义序列,Event Header中必须存在以 “timestamp”的key,但是默认情况下source读取进来的event的header为空,所以会报错; 解决办法就是配置:hdfs.useLocalTimeStamp这个参数,将其设置为true,此方法会使用TimestampInterceptor自动添加timestamp
本例中,写到hdfs的文件名是用的时间戳,所以这里必须配置
(2) hdfs.fileType
还有压缩配置:这些官网都有说明
(3) hdfs文件滚动
(1) 文件会根据时间和文件大小来滚动事件,不会根据event的个数来滚动文件。
(2) HDFS正在写的文件在HDFS中显示后缀为.tmp,滚动完成后去掉后缀
(3) .tmp的生成,是在有新的数据采集到hdfs的时候才会生成;只有时间达到或者文件大小达到,才会滚动完毕;如果一直没有数据进来,就不会形成.tmp文件
(3)运行Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启Hadoop和Hive并操作Hive产生日志
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh [atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh [atguigu@hadoop102 hive]$ bin/hive hive (default)>3. 实时监控目录下多个新文件 Spooldir source
需求:使用Flume监听整个目录的文件,并上传至HDFS
- spooldir source
功能:监听整个目录的多个文件,将目录下没有采集过的文件内容采集出来
执行原理:开启Spooldir Source监控某个目标目录,目录中没有被打上后缀标识的文件就会被采集。
1.被采集过的文件会被打上.completed后缀,来标识此文件被采集过了
2.如果新来的文件是.completed结尾的,不会被采集,在.completed文件中增加数据也不会被采集
3.如果1.txt被采集过了,变成1.txt.completed,那么再次创建一个1.txt新文件,会导致任务挂掉
4.新文件名符合忽略的文件名不会被采集
场景能采集目录下的多个新产生的文件的数据,不能对文件进行重复采集。
(1)创建配置文件flume-dir-hdfs.conf
[atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf
添加如下内容
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir #要监控的目录 a3.sources.r3.spoolDir = /opt/module/flume/upload #文件后缀 采集完成后添加该后缀 a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
说明:在使用Spooling Directory Source时,不要在监控目录中创建并持续修改文件;采集完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动。
(3)在/opt/module/flume目录下创建upload目录
[atguigu@hadoop102 flume]$ mkdir upload
向upload文件夹中添加文件
[atguigu@hadoop102 upload]$ touch atguigu.txt [atguigu@hadoop102 upload]$ touch atguigu.tmp [atguigu@hadoop102 upload]$ touch atguigu.log
(4)查看HDFS上的数据
4.实时监控多目录下的多个追加写入的文件 Tail dirsource功能:采集监控目录下所有文件的追加内容,如果目录下创建新文件也能被采集
特点:多文件监控+断点续传。
断点续传说明:利用json文件存储每个文件的采集点,如果flume挂了后重启能从采集点继续采集。
性能比较:
Exec Source:监控单文件追加,不能断点续传
TailDir Source:监控目录下多个文件的追加和创建,可以断点续传
Spooldir Sourc: 用于同步目录下的新文件,不适合对实时追加日志的文件进行监听并同步;
Taildir说明:
(1)Taildir Source维护了一个json格式的position File,会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。
(2)Position File的格式如下:
{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
position文件中三个信息:
1.Unix/Linux系统内部储存文件元数据的区域叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,不使用文件名。如果修改文件名,也不会修改inode号码。
2.pos: 读取文件的字节数
3.file:文件的绝对路径
tailDir重要bug: 如果inode或者文件名改变,会重新采集文件内容
log4j技术产生的日志文件会随着日期而更名,比如今天的到了12点,就从hive.log记录为hive.log-2020-10-31,产生新的hive.log文件记录新的一天的日志;这就会导致日志重复采集;[绝对路径变了,inode不变]
参考:https://blog.csdn.net/maoyuanming0806/article/details/79391010
flume源码包(下载地址:http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-src.tar.gz)
修改位置1 position文件的写入
修改位置2 日志文件的读取
只根据文件的inode值进行读取和写入
修改完后打包,上传到flume的lib目录下,替换原来的jar包
案例需求:使用Flume监听整个目录的实时追加写入的文件,并上传至HDFS
(1)创建配置文件flume-taildir-hdfs.conf
[atguigu@hadoop102 job]$ vim flume-taildir-hdfs.conf
添加如下内容
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = TAILDIR # 指定position_file位置 a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json a3.sources.r3.filegroups = f1 f2 # 定义监控的文件 a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.* a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.* # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
(3)向files文件夹中追加内容
在/opt/module/flume目录下创建files文件夹
[atguigu@hadoop102 flume]$ mkdir files #向upload文件夹中添加文件 [atguigu@hadoop102 files]$ echo hello >> file1.txt [atguigu@hadoop102 files]$ echo atguigu >> file2.txt
(4)查看HDFS上的数据
Flume进阶 Flume事务 (1) put事务(1)put事务发生于Source向Channel put数据的过程。数据以batch为单位由Source put到channel中,该batch就是PutList。source采集到的数据后,会先往batch里面放,当攒够一波event后,就会执行doCommit操作,尝试去提交。如果channel有足够的缓冲区,放进channel的队列中,如果放不下就rollback!
(2)putList的event容量就是flume配置文件中的事务容量
(3)rollback的后果就是putList中的数据全部销毁。同时给source报异常,source得知这批数据没有提交成功
注:Flume内部通用的put事务回滚是将putList中的数据全部删除。是否能够重新采集取决于用的是什么Source,对于TailDirSource来说,会记录文件中的采集点,因此可以对于回滚的数据,可以重新采集,对于没有记录断点的Source会丢失数据
(1)sink主动take channel中的 event,拉进takeList,当takeList攒够了一个批次,就要执行doCommit。如果数据全部被sink写出后,就清除takeList的数据,并且channel中 和takeList一样的数据也被清除。如果sink没有把数据成功写出完,就rollback(rollback将takeList原路返还到Channel中,因此要保证channel中有足够的容量用于takeList回滚)
(2)takeList的event容量也是flume配置文件中的事务容量
(3)take事务rollback的后果:可能会造成写出的数据重复。假设一个batch中的数据只写出了一部分发生了回滚,此时会从channel重新take数据,而这些数据已经有一部分被写出了
Flume 内部数据处理流程
(1) 数据在Flume中流程
a) source采集数据—>封装数据成event
b) Interceptor
c) Channel Selector 给event选择Channel
d) event进入channel
e) sinkProcessor 负责将event给哪个sink,以及怎么给
f) sink写出
(2)重要组件
1.ChannelSelector 选出Event将要被发往哪个Channel。
ReplicatingSelector: 会将同一个Event发往所有的Channel
Multiplexing: 会根据header,将不同的Event发往不同的Channel。
Multiplexing需要配合interceptor和多channel使用,header可以在拦截器中设置
2.SinkProcessor
(1)DefaultSinkProcessor 一个channel只能绑定一个sink
(2)LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,可以绑定多个sink
(3)LoadBalancingSinkProcessor可以实现负载均衡的功能
Sink按照一定的规则去channel拉取event,两种常用规则:(1)随机(2)轮寻:理论上是a->b->c-a->b->c… 默认采用轮寻的拉取规则
由于 event不是由channel分发的,而是sink去take的,因此实际测试中会看不到轮寻的效果。(说明:假设当轮寻到sink a的时候,这个时候a可以去channel拿数据,但是此时channel可能会为空,那么a这一次就拿不到数据)
(4)FailoverSinkProcessor 相当于高可用,故障转移;当sink a坏了,由sink b来接替工作
同一时间只能有一个sink工作,如果多个sink都可用,看优先级,谁高用谁。
(5)只有负载均衡SinkProcessor是同时有多个Sink工作的,FailoverSinkProcessor准备了多个Sink,但是同一时刻只能有一个工作。
flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统
复制和多路复用:一个source后面多个channel
应用场景:多个目的地
(1) 复制:将相同的event流向多个目的地。
(2) 多路复用:将不同数据分发到不同的目的地。
背景:一个channel后面多个sink(sink组)
应用场景:sink的负载均衡和高可用
注意:负载均衡是多个sink轮流写出,每个sink读取的数据是不一样的。
负载均衡和故障转移实际应用中sink目的地是相同的;
4.聚合(多source,1sink 最常见)
应用场景: web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析
1)案例需求
Flume-1监控文件变动,将变动内容传递给Flume-2和Flume-3
Flume-2负责存储到HDFS
Flume-3负责输出到Local FileSystem
2)需求分析:
单source,多个sink,两个sink的内容是一样的,只是目的地不同;因此肯定要用channel selector
Flume1: Exec Source,Replicating ChannelSelector,两个Memory Channel,两个Avro Sink
Flume2: Avro Source,Memory Channel,HDFS Sink
Flume3: Avro Source,Memory Channel,File_roll Sink
(1)配置文件flume-file-flume.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 将数据流复制给所有channel channelSelector配置 a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log a1.sources.r1.shell = /bin/bash -c # Describe the sink # sink端的avro是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
(2)配置文件flume-flume-hdfs.conf
Avro source
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source # avro source 是被动方,需要绑定上游avro sink指定的hostname和port a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 // hostname or IP address to listen on a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
(3)配置文件flume-flume-dir.conf
Avro souce,file_roll Sink。
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/data/flume3 # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
提示:file_roll sink输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
2.故障转移1)案例需求
Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能
(1)上游flume :flume-netcat-flume.conf
1个netcat source和1个channel、1个sink group(2个sink)
# Name the components on this agent a1.sources = r1 a1.channels = c1 #添加sink组 a1.sinkgroups = g1 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #将组处理器设置为故障转移 a1.sinkgroups.g1.processor.type = failover #为所有sink接收设置优先级 优先级越高,优先选择 (没有指定优先级,按照sink的配置顺序) a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 #设置sink组的成员! a1.sinkgroups.g1.sinks = k1 k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
(2)下游flume1:flume-flume-console1.conf
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = logger # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
(3)下游flume2:flume-flume-console2.conf
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
(4)执行配置文件
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
注:使用jps -ml查看Flume进程。
当前案例中flume3优先级更高,如果flume3宕机了,flume2会进行写出。如果flume3重新启动,而且flume1中连接flume3的sink过了退避时间,由于flume3优先级更高,因此flume3会接手工作
sink组的相关配置:
backoff:退避,如果sink失败了,就不用轮寻该sink,关进小黑屋一段时间,时间成指数增长,maxTimeOut规定了sink最大小黑屋时长(以后再关进小黑屋不再增长时长。) sink失败 解释:指的是sink下游端出现问题,sink不能正常将数据写出去
#name a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # source a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 44444 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # sink processor loadbalancing a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance #负载均衡的 procesor selector 有roud_robin和 random两种 a1.sinkgroups.g1.processor.selector = round_robin #是否规避 a1.sinkgroups.g1.processor.backoff = true # sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop103 a1.sinks.k1.port = 4444 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop104 a1.sinks.k2.port = 5555 #bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(2)下游配置flume2
#name a2.sources = r1 a2.channels = c1 a2.sinks = k1 # source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop103 a2.sources.r1.port = 4444 # channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # sink a2.sinks.k1.type = logger # bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
(3)flume3
#name a3.sources = r1 a3.channels = c1 a3.sinks = k1 # source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop104 a3.sources.r1.port = 5555 # channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # sink a3.sinks.k1.type = logger # bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
负载均衡和故障转移总结:
共同点:
1.sink组 a1.sinkgroups = g1 A1.sinkgroups.g1.sinks = k1 k2
2.sink失败退避,当sink没有拉取到数据(channel没有数据或者下游坏了,此时该sink会退避一段时间)
不同点:
failover:要给sink设置优先级,不指定按照配置文件sink的顺序
load balance:两种轮寻方式: round_robin和random
(1)上游:flume1-exec-flume.conf hadoop102上
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/group.log # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(2)上游:flume2-netcat-flume.conf hadoop103
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = hadoop103 a2.sources.r1.port = 44444 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop104 a2.sinks.k1.port = 4141 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
(3)下游:flume3-flume-logger.conf
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop104 a3.sources.r1.port = 4141 # Describe the sink # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
flume1和flume2的avro sink中hostname和port都是一样的。
Avro source bind的主机为自己所在的主机IP,
avro sink 的hostname 和port 都要和下游avro source bind的主机和端口保持一致
(avro souce 为自身,上游sink同步下游souce)
(4)分别开启对应配置文件
上下游flume先启动下游flume,再启动上游
[atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume1-logger-flume.conf [atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume2-netcat-flume.conf5.自定义Interceptor
(1)必须要搭配Multiplexing channel selector使用
(2)Multiplexing的原理:根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。
需求分析
使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
实现步骤
(1)maven项目中引入以下依赖flume-ng-core
org.apache.flume flume-ng-core 1.9.0
(2)自定义CustomInterceptor类并实现Interceptor接口
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
//(0)实现Interceptor接口
public class CustomInterceptor implements Interceptor {
//可不写
@Override
public void initialize() {
}
//(1)拦截方法,获取body,根据body内容做判断,getHeader,put
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
if (body[0] < 'z' && body[0] > 'a') {
event.getHeaders().put("type", "letter");
} else if (body[0] > '0' && body[0] < '9') {
event.getHeaders().put("type", "number");
}
return event;
}
//(2)拦截多个event 遍历调用intercept
@Override
public List intercept(List events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
//可不写
public void close() {
}
//(3)静态内部类,拦截器的builder 必须写
public static class Builder implements Interceptor.Builder {
@Override
//(4)获取拦截器对象
public Interceptor build() {
return new CustomInterceptor();
}
@Override
//读取flume中对interceptor的配置,可不写
public void configure(Context context) {
}
}
}
(3)进行打包,打包放进flume的lib目录下
(4)编辑flume配置文件 hadoop102
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 配置拦截器和多路复用channel selector a1.sources.r1.interceptors = i1 # 内部类全限定名称 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder # 多路复用选择器 a1.sources.r1.selector.type = multiplexing # 指定header的key为type a1.sources.r1.selector.header = type # --指定value为letter的进入channel c1 a1.sources.r1.selector.mapping.letter = c1 # --指定value为letter的进入channel c2 a1.sources.r1.selector.mapping.number = c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop103 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = hadoop104 a1.sinks.k2.port = 4242 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Use a channel which buffers events in memory a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
为hadoop103上的Flume4配置一个avro source和一个logger sink。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop103 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
为hadoop104上的Flume3配置一个avro source和一个logger sink。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop104 a1.sources.r1.port = 4242 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
(4)分别在hadoop102,hadoop103,hadoop104上启动flume进程,注意先后顺序。
(5)在hadoop102使用netcat向localhost:44444发送字母和数字。
(6)观察hadoop103和hadoop104打印的日志。



