一 Apache Flume 1. 概述
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件。
Flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume再删除自己缓存的数据。
Flume支持定制各类数据发送方,用于收集各类型数据;同时,Flume支持定制各种数据接受方,用于最终存储数据。一般的采集需求,通过对flume的简单配置即可实现。针对特殊场景也具备良好的自定义扩展能力。因此,flume可以适用于大部分的日常数据采集场景。
当前Flume有两个版本。Flume 0.9X版本的统称Flume OG(original generation),Flume1.X版本的统称Flume NG(next generation)。由于Flume NG经过核心组件、核心配置以及代码架构重构,与Flume OG有很大不同,使用时请注意区分。改动的另一原因是将Flume纳入 apache 旗下,Cloudera Flume 改名为 Apache Flume。
2.运行机制Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。
Agent:代理,flume集群中,每个节点都是一个agent,包含了flume单节点:接受、封装、承载、传输event到目的地的过程。这个过程中包含三部分(source、channel、sink)。
每一个agent相当于一个数据传递员,内部有三个组件:
Source:采集源,用于跟数据源对接,以获取数据;
Sink:下沉地,采集数据的传送目的地,用于往下一级agent传递数据或者往 最终存储系统传递数据;
Channel:agent内部的数据传输通道,用于从source将数据传递到sink;
在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。event将传输的数据进行封装。如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
一个完整的event包括:event headers、event body、event信息,其中event信息就是flume收集到的日记记录。
2.1 简单结构单个agent采集数据
2.2 复杂结构多级agent之间串联
3. Flume安装部署上传安装包到数据源所在节点上
然后解压 tar -zxvf apache-flume-1.9.0-bin.tar.gz
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME
#解压缩到servers目录下 tar -xvzf apache-flume-1.9.0-bin.tar.gz -C ../servers/
保存并退出即可
4.flume初体验进入conf目录下,创建该文件,配置内容
vim http_logger.properties
配置如下内容:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动服务
在/opt/servers/flume-1.9.0/目录下执行
bin/flume-ng agent -c conf -f conf/http_logger.properties -n a1 -Dflume.root.logger=INFO,console
注:
-c conf 指定flume自身的配置文件所在目录
-f conf/netcat-logger.con 指定我们所描述的采集方案
-n a1 指定我们这个agent的名字
flume启动后占用当前窗口,复制一个新的窗口在任意目录下执行以下
curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://hadoop01:22222
配置文件详解
#a1为自定义的agent名字,与启动命令中的-n属性对应 a1.sources = r1 #定义agent的数据源source,可以有多个。 a1.sinks = k1 #定义agent的数据出处,可以有多个。 a1.channels = c1 #定义agent的通道,一般有多少个sink就有多少个channel a1.sources.r1.type = http #指定source的类型http a1.sources.r1.bind = 0.0.0.0 #指定source的来源。一般为本机,被动接收 a1.sources.r1.port = 22222 #指定端口 a1.sinks.k1.type = logger //前端日志打印输出 a1.channels.c1.type = memory #指定channel的类型为 内存 a1.channels.c1.capacity = 1000 #指定存储容量,避免强制抢占内存影响其他进程的正常运行 a1.channels.c1.transactionCapacity = 100 #指定事务容量 a1.sources.r1.channels = c1 #r1源指定哪一个管道channels,即c1来绑定传输source a1.sinks.k1.channel = c1 #指定的管道要传递哪个下沉点,即c1管道绑定sink
正式操作:
保存并退出
flume启动后占用当前窗口,需要在一个新的窗口的任意目录下执行以下命令
查看占用的窗口下接受的源数据信息
二 Source数据源 练习案例 1.avro源的传输,对象都需要序列化才能转移发送和传输,数据的传输都需要序列化和反序列化
vim avro_logger.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger #管道选择memory,它可以把event内的数据放到内存当中进行数据传递 #如果选择File channel,它会把event内数据保存到本地磁盘,再从本地磁盘发送给sink #传递速度memory快,但在传递过程中机器断电等问题,会造成数据的丢失,所以要客观选择 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1.在/opt/data/flumedata下创建文件log.txt并编辑添加数据
2.在flume安装目录下的conf目录下执行命令启动agent
bin/flume-ng agent -c conf -f conf/avro_logger.conf -n a1 -Dflume.root.logger=INFO,console
3.模拟发送avro在flume的bin目录下执行:
bin/flume-ng avro-client -c conf -H hadoop01 -p 22222 -F /opt/data/flumedata/log.txt
正式操作:
注意:2222端口号前面那个一定要先关掉,否则会被占用,然后保存并退出
接着第一步:在01新窗口下创建新文件目录并 cd /opt/data/flumedata下创建文件log.txt并编辑添加数据
回到原来01的窗口,启动agent服务
模拟发送avro在flume的bin目录下执行
发送的信息以logger日志的形式展示在控制台
2.Spooldirspooldir:source源,用于监控文件目录,收集文件目录数据
注意:
1)对于文件中要源源不断写入的这情况,不适合使用spooldir。看似控制台收集了数据,实际这个服务已经宕掉了,不会再变成.completed文件
2)对于已经监控的文件,如果有相同文件名再次放入到监控目录中,此时服务会报错,并不再进行监控。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/data/spooldir a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
在/home/data目录下创建文件夹spooldir
启动
在spooldir中vim文件并添加内容并保存。发现flume日志中打印编辑内容。
打开01新窗口创建文件:
回到原来的01窗口启动服务:
收集的文件内容会在控制台展示
为什么没有显示完全?因为下沉点默认最多显示15个字节,可以设置更多以供显示收集的数据内容
3.采集目录到HDFS# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ##注意:不能往监控目中重复丢同名文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/data/spooldir #a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 #获取时间 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
参数解析:
· rollInterval
默认值:30
hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;
如果设置成0,则表示不根据时间来滚动文件;
注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
· rollSize
默认值:1024
当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
如果设置成0,则表示不根据临时文件大小来滚动文件;
· rollCount
默认值:10
当events数据达到该数量时候,将临时文件滚动成目标文件;
如果设置成0,则表示不根据events数据来滚动文件;
· round
默认值:false
对文件目录进行滚动。
是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。
· roundValue
默认值:1
时间上进行“舍弃”的值;
· roundUnit
默认值:seconds
时间上进行“舍弃”的单位,包含:second,minute,hour
开始--编辑配置文件:
启动服务时自动收集数据并上传到hdfs上去:因为这是在hadoop的环境下启动的所以自动会上传
4.采集文件到HDFS
exec只能指定一个文件进行监控,监控的是源源不断写入的文件。
# Name the components on this agent a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/data/exec/test.log a1.sources.r2.type = exec a1.sources.r2.command = tail -F /opt/data/exec/test1.log a1.sources.r3.type = exec a1.sources.r3.command = tail -F /opt/data/exec/test2.log # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/exec/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sources.r3.channels = c1 a1.sinks.k1.channel = c1
开发shell脚本定时追加文件内容
mkdir -p /opt/servers/shells/ cd /opt/servers/shells/ vim exec.sh
#!/bin/bash while true do date >> /opt/data/exec/test.log; sleep 0.5; done
启动脚本
sh /opt/servers/shells/exec.sh5.TailDir的使用
同时监控多个文件的持续写入
固定文件:1.txt 2.txt
持续写入文件:test.log 3.txt
source:taildir
sink:hdfs
channel:memory
#source channel sink # 定义核心组件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #定义source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/data/taildir/log1/.*txt.* a1.sources.r1.filegroups.f2 = /opt/data/taildir/log2/.*log.* a1.sources.r1.positionFile = /opt/data/taildir/taildir_position.json a1.sources.r1.fileHeader = true a1.sources.r1.headers.f1.key1 = value1 #定义sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/taildir/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.useLocalTimeStamp = false a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.fileType = DataStream #定义channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #将channel和source、sink关联起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1



