栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flume三种保存到kakfa的方式(文件,目录,端口)

flume三种保存到kakfa的方式(文件,目录,端口)

flume采集文件保存到kafka

创建文件flumeexec.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 描述和配置source组件:r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/nana/text.log
 
# 描述和配置sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumetopic
a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092,hadoop2:9092,hadoop3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
 
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

启动flume:
bin/flume-ng agent -c conf -f conf/flumeexec.conf -n a1 -=Dflume.root.logger=INFO,console
启动成功,并且自动创建topic
消费kafka中的数据:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic flumeTopic

从目录采集到kafka

创建文件flumetaildir.conf

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=TAILDIR
# 元数据位置
a1.sources.r1.positionFile = /usr/local/src/flume/taildir_position.json
# 监控的目录
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/src/flume/data/.*log
a1.sources.r1.fileHeader = true

a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092
a1.sinks.k1.kafka.topic = testTopic3
a1.sinks.k1.kafka.producer.compression.type = snappy

a1.channels.c1.type=file
# 数据存放路径
a1.channels.c1.checkpointDir = /usr/local/src/flume/filechannle/checkpointDir
# 检查点路径
a1.channels.c1.dataDirs = /usr/local/src/flume/filechannle/dataDirs
# channel中最多缓存多少
a1.channels.c1.capacity=1000
# channel一次最多吐给sink多少
a1.channels.c1.transactionCapacity=100


a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume:
bin/flume-ng agent -c conf -f conf/flumetaildir.conf -n a1 -=Dflume.root.logger=INFO,console
启动成功,并且自动创建topic
消费kafka中的数据:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic testTopic3

从端口采集到kafka

创建文件flumenc.conf

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=testnc
a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092,hadoop2:9092,hadoop3:9092
a1.sinks.k1.kafka.flumeBatchSize=20
a1.sinks.k1.kafka.producer.acks=1

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume:
bin/flume-ng agent -c conf -f conf/flumenc.conf -n a1 -=Dflume.root.logger=INFO,console
启动成功,并且自动创建topic
消费kafka中的数据:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic testnc

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487417.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号