Java:jdk1.8.0_241
安装版本flume:apache-flume-1.9.0
安装包见同级目录下 apache-flume-1.9.0-bin
本文安装方法windows本地 下载flume https://flume.apache.org/download.html
用 rz 命令上传到Linux本地 /export/software
cd 到/export/server
将压缩包解压到/export/server
cd /export/software tar -zxvf ./apache-flume-1.9.0-bin.tar.gz -C /export/server配置修改
进入/export/server/apache-flume-1.9.0-bin/conf 下编辑 flume-env.sh.template 文件
cd /export/server/apache-flume-1.9.0-bin/conf # 将 flume-env.sh.template 重命名 mv flume-env.sh.template flume-env.sh # 修改 flume-env.sh 导入JAVA_HOME vim flume-env.sh
export JAVAA_HOME=/export/server/jdk
因为后面遇到了 java.lang.OutOfMemoryError: Java heap sparce的问题,所以在flume-env.sh下设置JVM相关运行参数的变量
export JAVA_OPTS="-Xms2048m -Xmx2048 -Dcom.sun.management.jmxremote"
并且对应修改了flume的启动脚本flume-ng中的参数默认是JAVA_OPTS="-Xmx20m"
vim ../bin/flume-ng # 与flume-env.sh文件中的参数大小对应 JAVA_OPTS="-Xmx2048m"
设置flume环境变量
# FLUME_HOME export FLUME_HOME=/export/server/apache-flume-1.9.0.bin export PATH=$PATH:$FLUME_HOME/bin本地测试 测试环境
Kafka:kafka_2.11-2.0.0
Hadoop:hadoop-2.6.0-cdh5.16.2
zookeeper:zookeeper-3.4.5-cdh5.16.2
测试准备启动zk
使用kafka自带的zookeeper
cd /export/server/zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka
cd /export/server/kafka bin/kafka-server-start.sh config/server.properties
启动hadoop
测试效果目的:用flume做消费者 消费Jndroid-06服务器Kafka中greatlibrary_xiaohongshu_note_1主题中的数据 然后放到HDFS上
flume配置文件开发
vim /export/server/apache-flume-1.9.0-bin/conf/Jndroid-06_node1 # Jndroid-06_node1 # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = org.apache.flume.source.kafka.KafkaSource # 一次向channel中写入一批event的数量 a2.sources.r2.batchSize = 1000 # 一次put操作,间隔的最大时间,超过这个时间会报错 a2.sources.r2.batchDurationMillis = 2000 a2.sources.r2.kafka.bootstrap.servers = 192.168.3.156:9092 ## 消费者(组)id,同一id代表同一消费者组 a2.sources.r2.kafka.consumer.group.id = Washu20220211_2 ## 当groupid为新的时,earliest才会生效,从头开始消费 a2.sources.r2.kafka.consumer.auto.offset.reset = earliest ## topic (多个topic后+逗号跟topic名) a2.sources.r2.kafka.topics = greatlibrary_xiaohongshu_note_1 # 自动提交偏移量的时间间隔 #a2.sources.r2.kafka.consumer.auto.commit.interval.ms = 60000 # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://node1:8020/Xiaohongshu/%Y%m%d #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = greatlibrary_xiaohongshu_note_1- #是否对时间戳取整 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 500 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #HDFS上的文件达到60秒生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 60 #HDFS上的文件达到128M时生成一个文件(设置每个文件的滚动大小) a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 a2.sinks.k2.custom.encoding=UTF-8 # Use a channel which buffers events in memory a2.channels.c2.type = memory # # channel一次最多吐给sink多少 a2.channels.c2.transactionCapacity = 2000 # Amount of time (in sec) to wait for a put operation a2.channels.c2.keep-alive = 60 # channel的最大容量 a2.channels.c2.capacity = 1000000 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
本地机器添加hosts
vim /etc/hosts 192.168.x.xxx xxxxx-Kafka-xx kafka01
启动flume
# a2为agent名称,-c表示配置文件存储在conf/目录,-f flume要读取配置文件名称 # -D表示flume运行时动态修改flume.root.logger参数属性值,将高于info级别的日志打印在控制台,便于测试 bin/flume-ng agent -n a2 -c conf/ -f conf/Jndroid-06_node1 -Dflume.root.logger=INFO,console注意
1、flume对从Kafka中读取到的Event默认会在Event Header中添加3个属性partition,topic,timestamp。如partition=2, topic=testTopic3, timestamp=1529237228398。timestamp:默认不是事件时间,也不是入Kafka时间,是flume agent 所在机器的毫秒时间戳。
1、flume对从Kafka中读取到的Event默认会在Event Header中添加3个属性partition,topic,timestamp。如partition=2, topic=testTopic3, timestamp=1529237228398。timestamp:默认不是事件时间,也不是入Kafka时间,是flume agent 所在机器的毫秒时间戳。
2、kafka.topics.regex属性 可通过正则定义要读取的Topic 列表 (本文未涉及)



