项目中需要将用户的行为数据或者其他数据放入大数据仓库,已有kafka服务。
解决方案我们可以通过flume获取kafka实时数据并转存储到hdfs。
转存到hdfs后,再通过load data命令加载到Hive表中,hive再处理用户行为数据,最终输出到mysql呈现到用户端。
具体步骤一. 安装部署Hadoop并启动Hadoop,具体步骤见:Windows10 安装Hadoop3.3.0_xieedeni的博客-CSDN博客
Windows10安装Hive3.1.2_xieedeni的博客-CSDN博客
说明:这里的版本本人安装的是Hadoop3.3.0,kafka是腾讯云,Flume这里建议安装flume1.9
二. Windows下安装Flume
1.下载flume1.9
flume官方下载地址是http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
这个地址下载速度慢的话,可以使用镜像资源地址:https://download.csdn.net/download/xieedeni/24882711
2.解压apache-flume-1.9.0-bin
3.配置flume环境变量
三. flume配置文件
1.创建flume连接kafka到hdfs配置文件%FLUME%/conf/kafka2hdfs.conf
agent.sources = kafka_source agent.channels = mem_channel agent.sinks = hdfs_sink # 以下配置 source agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource agent.sources.kafka_source.channels = mem_channel agent.sources.kafka_source.batchSize = 5000 agent.sources.kafka_source.kafka.bootstrap.servers = kafka1:6003 agent.sources.kafka_source.kafka.topics = flume-collect agent.sources.kafka_source.kafka.consumer.group.id = group-1 # kafka访问协议 agent.sources.kafka_source.kafka.consumer.security.protocol = SASL_PLAINTEXT agent.sources.kafka_source.kafka.consumer.sasl.mechanism = PLAIN agent.sources.kafka_source.kafka.consumer.sasl.kerberos.service.name = kafka # 以下配置 sink agent.sinks.hdfs_sink.type = hdfs agent.sinks.hdfs_sink.channel = mem_channel agent.sinks.hdfs_sink.hdfs.path = hdfs://127.0.0.1:9000/warehouse/dd/bigdata/ods/tmp/applogs/%Y-%m-%d agent.sinks.hdfs_sink.hdfs.filePrefix = ods_event_log- agent.sinks.hdfs_sink.hdfs.fileSuffix = .log agent.sinks.hdfs_sink.hdfs.rollSize = 0 agent.sinks.hdfs_sink.hdfs.rollCount = 0 agent.sinks.hdfs_sink.hdfs.rollInterval = 3600 agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30 agent.sinks.hdfs_sink.hdfs.fileType=DataStream agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp=true agent.sinks.hdfs_sink.hdfs.writeFormat=Text # 以下配置 channel agent.channels.mem_channel.type = memory agent.channels.mem_channel.capacity = 100000 agent.channels.mem_channel.transactionCapacity = 10000
参数说明:
a.好好研究下官方文档,不然过程中真的会遇到很多坑Flume 1.9.0 User Guide — Apache Flume
b.kafka协议,真是坑,网上一堆资料唯独这个介绍的不够全
# kafka访问协议 agent.sources.kafka_source.kafka.consumer.security.protocol = SASL_PLAINTEXT agent.sources.kafka_source.kafka.consumer.sasl.mechanism = PLAIN agent.sources.kafka_source.kafka.consumer.sasl.kerberos.service.name = kafka
大家看到这里的kafka协议使用的是SASL_PLAINTEXT,如果需要其他方式请参看官方文档啊。
2.既然使用了protocol协议为SASL_PLAINTEXT,则需要如下设置
a.复制%FLUME%/conf/flume-env.sh.template命名为flume-env.sh还放到这个文件夹,内容为:
export JAVA_HOME=D:workjdk1.8.0_291
b.复制%FLUME%/conf/flume-env.ps1.template命名为flume-env.ps1还放到这个文件夹,内容为:
$JAVA_OPTS="-Djava.security.auth.login.config=D:worksoftapache-flume-1.9.0-binconfkafka_client_jaas.conf" $FLUME_CLASSPATH="D:worksoftapache-flume-1.9.0-binlib"
这里涉及到一个关键的文件kafka_client_jaas.conf,是用于kafka公网接入方式的protocol协议为SASL_PLAINTEXT
c.创建%FLUME%/conf/kafka_client_jaas.conf文件,还是放在conf下,内容为:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="ckafka-123#kafka"
password="123";
};
这里的username为“实例id#用户名”
四. 启动flume
启动的前提是:kafka服务已启动,topic已创建;hadoop服务已启动并创建了database,hadoop文件需要有权限。
启动命令:
cd %FLUME_HOME%/bin flume-ng agent -c %FLUME_HOME%/conf -n agent -f %FLUME_HOME%/conf/kafka2hdfs.conf &
参数 作用 举例
–conf 或 -c 指定配置文件夹,包含flume-env.sh和log4j的配置文件 –conf conf
–conf-file 或 -f 配置文件地址 –conf-file conf/flume.conf
–name 或 -n agent名称 –name a1
启动成功:
如果没有具体的日志信息,请修改%FLUME%/conf/log4j.properties
五. 测试
kafka生成一条消息,flume消费落地到hdfs
小结本人新手,比较笨,为了实现这个功能,研究花费近3天时间,中间遇到了很多坑。通过查阅资料,网上的资料都不够完整,这种东西就是难者不会会者不难。真的达到了目的,反而觉得出现的问题真是不难,但过程中却是处处碰壁。所以记录下遇到的问题,以供以后查阅,也分享给需要的小伙伴们。别放弃,阳光总在风雨后。
踩到的坑1.启动flume后,就运行到(lifecycleSupervisor-1-0) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.
后面没有再输出内容,也没有提示是否连接到了kafka的topic。如下图:
图中没有任何报错信息,kafka生产消息后flume也接收不到,没有任何响应。
原因:检查kafka是否开启了安全策略,如果开启,需要设置protocol
# kafka访问协议 agent.sources.kafka_source.kafka.consumer.security.protocol = SASL_PLAINTEXT agent.sources.kafka_source.kafka.consumer.sasl.mechanism = PLAIN agent.sources.kafka_source.kafka.consumer.sasl.kerberos.service.name = kafka
方法见步骤三。
这里是个新手坑,如果遗漏设置的话,flume就连接不到kafka。
2.配置文件无误且连接到了kafka,flume接收消息后落地到hdfs报错
[ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
恭喜你,这应该是最后一个坑了。
原因:这是jar包冲突
解决办法是把Hadoop的hadoop-3.3.0/share/hadoop/common/lib/guava-27.0-jre.jar拷贝到Flume的%FLUME_HOME%/lib中,并删除Flume自带的%FLUME_HOME%/lib/guava-11.0.2.jar,删除后再次重启flume就行了。



