栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume采集日志到Kafka经典案例

Flume采集日志到Kafka经典案例

环境准备:

涉及到的技术有flume,Kafka,zookeeper。

操作步骤:

1、构建agent

train.sources=trainSource
train.channels=trainChannel
train.sinks=trainSink

train.sources.trainSource.type=spooldir
train.sources.trainSource.spoolDir=/opt/kb15tmp/flumelogfile/train
train.sources.trainSource.deserializer=LINE
train.sources.trainSource.deserializer.maxLineLength=320000
train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
train.sources.trainSource.interceptors=head_filter
train.sources.trainSource.interceptors.head_filter.type=regex_filter
train.sources.trainSource.interceptors.head_filter.regex=^user*
train.sources.trainSource.interceptors.head_filter.excludeEvents=true

train.channels.trainChannel.type=file
train.channels.trainChannel.checkpointDir=/opt/kb15tmp/checkpoint/train
train.channels.trainChannel.dataDirs=/opt/kb15tmp/checkpoint/data/train

train.sinks.trainSink.type=org.apache.flume.sink.kafka.KafkaSink
train.sinks.trainSink.batchSize=640
train.sinks.trainSink.brokerList=192.168.91.180:9092
train.sinks.trainSink.topic=train

train.sources.trainSource.channels=trainChannel
train.sinks.trainSink.channel=trainChannel

2、启动Kafka和zookeeper
启动zookeeperzkServer.sh start
启动Kafkanohup kafka-server-start.sh /opt/soft/kafka211/config/server.properties &

3、启动消费者进行消费
首先先创建主题,kafka-topics.sh --create --zookeeper 192.168.91.180:2181 --topic train --partitions 1 --replication-factor 1
消费:
kafka-console-consumer.sh --bootstrap-server 192.168.91.180:9092 --topic train --from-beginning

4、启动flume
./bin/flume-ng agent --name train --conf conf/ --conf-file conf/KB15conf/train.conf -Dflume.root.logger=INFO,console

5、将需要消费的日志文件拷贝到指定的文件夹下
cp train.csv /opt/kb15tmp/flumelogfile/train/train_2021-12-27.csv

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/695957.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号