栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flume安装及flume消费kafaka数据到hdfs

flume安装及flume消费kafaka数据到hdfs

flume安装 安装环境

Java:jdk1.8.0_241

安装版本

flume:apache-flume-1.9.0

安装包见同级目录下 apache-flume-1.9.0-bin

本文安装方法

windows本地 下载flume https://flume.apache.org/download.html

用 rz 命令上传到Linux本地 /export/software

cd 到/export/server

将压缩包解压到/export/server

cd /export/software
tar -zxvf ./apache-flume-1.9.0-bin.tar.gz -C /export/server
配置修改

进入/export/server/apache-flume-1.9.0-bin/conf 下编辑 flume-env.sh.template 文件

cd /export/server/apache-flume-1.9.0-bin/conf
# 将 flume-env.sh.template 重命名
mv flume-env.sh.template flume-env.sh
# 修改 flume-env.sh 导入JAVA_HOME
vim flume-env.sh
export JAVAA_HOME=/export/server/jdk

因为后面遇到了 java.lang.OutOfMemoryError: Java heap sparce的问题,所以在flume-env.sh下设置JVM相关运行参数的变量

export JAVA_OPTS="-Xms2048m -Xmx2048 -Dcom.sun.management.jmxremote"

并且对应修改了flume的启动脚本flume-ng中的参数默认是JAVA_OPTS="-Xmx20m"

vim ../bin/flume-ng
# 与flume-env.sh文件中的参数大小对应
JAVA_OPTS="-Xmx2048m"

设置flume环境变量

# FLUME_HOME
export FLUME_HOME=/export/server/apache-flume-1.9.0.bin
export PATH=$PATH:$FLUME_HOME/bin
本地测试 测试环境

Kafka:kafka_2.11-2.0.0

Hadoop:hadoop-2.6.0-cdh5.16.2

zookeeper:zookeeper-3.4.5-cdh5.16.2

测试准备

启动zk

使用kafka自带的zookeeper

cd /export/server/zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka

cd /export/server/kafka
bin/kafka-server-start.sh config/server.properties

启动hadoop

测试效果

目的:用flume做消费者 消费Jndroid-06服务器Kafka中greatlibrary_xiaohongshu_note_1主题中的数据 然后放到HDFS上

flume配置文件开发

vim /export/server/apache-flume-1.9.0-bin/conf/Jndroid-06_node1

# Jndroid-06_node1

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
# 一次向channel中写入一批event的数量
a2.sources.r2.batchSize = 1000
# 一次put操作,间隔的最大时间,超过这个时间会报错
a2.sources.r2.batchDurationMillis = 2000
a2.sources.r2.kafka.bootstrap.servers = 192.168.3.156:9092
## 消费者(组)id,同一id代表同一消费者组
a2.sources.r2.kafka.consumer.group.id = Washu20220211_2
## 当groupid为新的时,earliest才会生效,从头开始消费
a2.sources.r2.kafka.consumer.auto.offset.reset = earliest
## topic (多个topic后+逗号跟topic名)
a2.sources.r2.kafka.topics = greatlibrary_xiaohongshu_note_1
# 自动提交偏移量的时间间隔
#a2.sources.r2.kafka.consumer.auto.commit.interval.ms = 60000

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://node1:8020/Xiaohongshu/%Y%m%d
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = greatlibrary_xiaohongshu_note_1-
#是否对时间戳取整
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 500
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#HDFS上的文件达到60秒生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#HDFS上的文件达到128M时生成一个文件(设置每个文件的滚动大小)
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.custom.encoding=UTF-8

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
# # channel一次最多吐给sink多少
a2.channels.c2.transactionCapacity = 2000
# Amount of time (in sec) to wait for a put operation
a2.channels.c2.keep-alive = 60
# channel的最大容量
a2.channels.c2.capacity = 1000000

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

本地机器添加hosts

vim /etc/hosts

192.168.x.xxx   xxxxx-Kafka-xx  kafka01

启动flume

# a2为agent名称,-c表示配置文件存储在conf/目录,-f flume要读取配置文件名称
# -D表示flume运行时动态修改flume.root.logger参数属性值,将高于info级别的日志打印在控制台,便于测试
bin/flume-ng agent -n a2 -c conf/ -f conf/Jndroid-06_node1 -Dflume.root.logger=INFO,console
注意

1、flume对从Kafka中读取到的Event默认会在Event Header中添加3个属性partition,topic,timestamp。如partition=2, topic=testTopic3, timestamp=1529237228398。timestamp:默认不是事件时间,也不是入Kafka时间,是flume agent 所在机器的毫秒时间戳。

1、flume对从Kafka中读取到的Event默认会在Event Header中添加3个属性partition,topic,timestamp。如partition=2, topic=testTopic3, timestamp=1529237228398。timestamp:默认不是事件时间,也不是入Kafka时间,是flume agent 所在机器的毫秒时间戳。

2、kafka.topics.regex属性 可通过正则定义要读取的Topic 列表 (本文未涉及)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/743312.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号