数据序列化系统
在目录/opt/servers/flume-1.9.0/conf 下创建文件
vim avro_logger.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1.在/opt/data/flumedatas下创建文件log.txt并编辑添加数据
2.在flume安装目录下的conf目录下执行命令启动agent
bin/flume-ng agent -c conf -f conf/avro_logger.conf -n a1 -Dflume.root.logger=INFO,console
3.模拟发送avro在flume的bin目录下执行:
bin/flume-ng avro-client -c conf -H hadoop01 -p 22222 -F /opt/data/flumedatas/log.txt二、Spooldir
spooldir:source源,用于监控文件目录
注意:
1)对于文件中要源源不断写入的这情况,不适合使用spooldir。
2)对于已经监控的文件,如果有相同文件名再次放入到监控目录中,此时服务会报错,并不再进行监控。
在目录/opt/servers/flume-1.9.0/conf下创建文件
vim spooldir_log.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/data/spooldir a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 创建目录 mkdir /opt/data/spooldir 并创建文件vim 1.log vim 2.txt 任意添加内容并保存
- 启动服务 bin/flume-ng agent -c conf/ -f conf/spooldir_log.conf -n a1 -Dflume.root.logger=INFO,console
发现flume日志中打印编辑内容
conf目录编辑文件
vim spooldir_hdfs.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ##注意:不能往监控目中重复丢同名文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/data/spooldir a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动服务 bin/flume-ng agent -c conf/ -f conf/spooldir_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
注意:1.启动agent前,hadoop需启动
2./opt/data/spooldir 目录下不能有重复文件,否则agent启动不起来
参数解析:
· rollInterval
默认值:30
hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;
如果设置成0,则表示不根据时间来滚动文件;
注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
· rollSize
默认值:1024
当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
如果设置成0,则表示不根据临时文件大小来滚动文件;
· rollCount
默认值:10
当events数据达到该数量时候,将临时文件滚动成目标文件;
如果设置成0,则表示不根据events数据来滚动文件;
· round
默认值:false
对文件目录进行滚动。
是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。
· roundValue
默认值:1
时间上进行“舍弃”的值;
· roundUnit
默认值:seconds
时间上进行“舍弃”的单位,包含:second,minute,hour
四、采集文件到HDFSexec只能指定一个文件进行监控,监控的是源源不断写入的文件。
创建文件 /opt/data/exec/test.log
conf目录编辑文件
vim exec_hdfs.conf
写下面内容
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/data/exec/test.log # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动服务
bin/flume-ng agent -c conf/ -f conf/exec_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
开发shell脚本定时追加文件内容
mkdir -p /opt/servers/shell/ cd /opt/servers/shell/ vim exec.sh
#!/bin/bash while true do date >> /opt/data/exec/test.log; sleep 0.5; done
创建文件夹
mkdir -p /opt/data/taillogs
启动脚本
sh exec.sh五、TailDir的使用
同时监控多个文件的持续写入 1.log 2.log
vim taildir_logger.conf
# 定义source、channel 和sink的组件及别名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source taildir a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/data/flumedatas/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/data/flumedatas/taildir1/a.log a1.sources.r1.headers.f1.headerKey1 = value1 a1.sources.r1.filegroups.f2 = /opt/data/flumedatas/taildir2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 # sink logger a1.sinks.k1.type = logger # channel memory a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #定义三个组件之间的关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动服务 bin/flume-ng agent -c conf/ -f conf/taildir_logger.conf -n a1 -Dflume.root.logger=INFO,console
往/opt/data/flumedatas/taildir2/1.log 不停写入数据,查看是否监控到
Hadoop01:JDK、Hadoop、Flume
Hadoop02:JDK、Flume
Hadoop03:JDK、Flume
只需要将hadoop01安装好的Flume文件夹发送到02 03两个节点相应的位置即可。
scp -r flume-1.9.0/ hadoop02:$PWD1 hadoop01
vim http_avro.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop02 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c12 hadoop02
vim avro_avro.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c13 hadoop03
vim avro_log.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
按照顺序从hadoop03启动节点
hadoop03 bin/flume-ng agent -c conf/ -f conf/avro_log.conf -n a1 -Dflume.root.logger=INFO,console
hadoop02 bin/flume-ng agent -c conf/ -f conf/avro_avro.conf -n a1 -Dflume.root.logger=INFO,console
hadoop01 bin/flume-ng agent -c conf/ -f conf/http_avro.conf -n a1 -Dflume.root.logger=INFO,console
hadoop01连接到hadoop02,hadoop02连接到hadoop03 最后下沉点
往hadoop01发送数据测试 curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://hadoop01:22222
七、扇入(fan-in) 1 Hadoop01a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c12 Hadoop02
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c13 Hadoop03
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1八、扇出(fanout) 1 Hadoop01
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop02 a1.sinks.k1.port = 22222 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop03 a1.sinks.k2.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c22 Hadoop02
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c13 Hadoop03
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1



