通过flume采集日志写入HDFS,采集通道如下图所示,由两个flume组成一个从日志文件到HDFS的通道:
上面的第二个flume是多余的,下图所示的通道和上图所示效果一样且更简洁,上图是为了学习,搞的更复杂一点。
PS: FileChannel原理:
http://archive.apache.org/dist/flume/2,解压即安装 3,删除一个jar包,这个包和hadoop中的包冲突
rm /module/flume/lib/guava-11.0.2.jar
删除之后要配置Hadoop_Home的环境变量
4,配置文件将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
mv flume-env.sh.template flume-env.sh vi flume-env.sh export JAVA_HOME=/opt/module/jdk1.8.0_212二,项目经验之Flume组件选型
- 1)Source
Taildir Source相比Exec Source、Spooling Directory Source的优势 :
TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。不会丢数据,但是有可能会导致数据重复。
Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
Spooling Directory Source监控目录,支持断点续传。
batchSize设置最佳实际:Event 1K左右时,500-1000合适(默认为100)
- 2)Channel
采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。
三,编写配置文件读取日志写入Kafka注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中。
vim file-flume-kafka.conf
#为各组件命名 a1.sources = r1 a1.channels = c1 #描述source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /export/applog/log/app.* a1.sources.r1.positionFile = /export/soft/apache-flume-1.9.0-bin/taildir_position.json #a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = com.flume.interceptor.ETLInterceptor$Builder #描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false #绑定source和channel以及sink和channel的关系 a1.sources.r1.channels = c1四,启动flume,执行程序
bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
这样启动,shell工具关闭后,进程会被杀死,采用nohup启动:
nohup bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &五,脚本启/停flume
#! /bin/bash
case $1 in
"start"){
for i in node1 node2
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"
done
};;
"stop"){
for i in node1 node2
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print $2}' | xargs -n1 kill -9 "
done
};;
esac
-Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1
表示把日志存入/opt/module/flume/log1.txt
2>&1,2表示错误日志,2>&1代表的意思是错误日志也像前面的flume日志一样存放到log1.txt中
六,编写配置文件读取Kafka写入HDFS"ps -ef | grep file-flume-kafka | grep -v grep |awk ‘{print $2}’ | xargs -n1 kill -9 "
注意:1,flume的进程名是Application,但其他进程也可能是这个名字,所以使用进程名不准确,最好使用配置文件名称file-flume-kafka
2, ‘{print $2}’,要加转义符,原因是不加的话表示取命令行传进来的参数,加了之后才表示取切割后的第二个值,直接在命令行执行时不需要,在脚本里面就需要这样
3,xargs -n1 kill -9 表示取前面命令的结果,然后传参给kill -9
vim kafka-flume-hdfs.conf
## 组件 a1.sources=r1 a1.channels=c1 a1.sinks=k1 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sources.r1.kafka.topics=topic_log #解决零点漂移问题,代码见另一篇博客【Flume实战-解决零点漂移-时间戳拦截器】 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder ## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /export/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /export/module/flume/data/behavior1/ ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.round = false #控制生成的小文件 #a1.sinks.k1.hdfs.rollInterval = 10 #a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1七,消费者flume启停脚本
#! /bin/bash
case $1 in
"start"){
for i in node3
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /export/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt 2>&1 &"
done
};;
"stop"){
for i in node3
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print $2}' | xargs -n1 kill"
done
};;
esac
异常记录
1,kafka集群运行异常,有节点没启动;
2,Exception in thread “SinkRunner-PollingRunner-DefaultSinkProcessor“ java.lang.NoSuchMethodError: c
jar包冲突,flume lib下的jar和hadoop jar包冲突,删除flume的lib下的低版本guava-11.0.2.jar,换成Hadoop下的高版本guava-27.0-jre.jar。直接删除,然后把hadoop下的jar拷贝过来,或者配置HADOOP_HOME也可以。
项目经验 0,排查问题注意查看日志,日志中的报错信息非常重要;
如果有拦截器,重点检查拦截器;
从后往前排查数据是否存在:FileChannel设置的中间文件目录中是否有数据、kafka中是否有数据、数据源是否有数据
如果数量特别大,短时间内会生成大量数据,则可以按小时滚动生成日志文件,可以使用hdfs.round/hdfs.roundValue/hdfs.roundUnit
2)解决方案步骤ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
- 修改/opt/module/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"3)Flume内存参数设置及优化
JVM heap一般设置为4G或更高
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;
-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc



