栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka学习笔记 -- Kafka对接Flume

Kafka学习笔记 -- Kafka对接Flume

Kafka对接Flume

Kafka消费Flume推送的数据Kafka生产数据推送到Flume

读取kafka数据只需更换KafkaSource
推送数据只需更换为KafkaSink

Kafka消费Flume推送的数据

agent组件:
taildirSource -> memoryChannel -> kafkaSink

    在flume/job目录下创建文件flume-taildir-kafka.conf
vim job/flume-taildir-kafka.conf
    添加配置内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/flume/tail_dir.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/flume/files/.*file.*

# kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.topic = first
# 一批次拉取多少条日志
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# memory channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
    启动kafka消费者
bin/kafka-console-consumer.sh --topic first --bootstrap-server master:9092
    启动flume
bin/flume-ng agent -n a1 -c conf/ -f job/flume-taildir-kafka.conf
    在flume下的files文件中创建file3.txt并写入内容
vi files/text.3
# 写入以下内容
hello 
flume
kafka
23333

新建文件结果


追加内容结果

Kafka生产数据推送到Flume

读取kafka数据输出到控制台
agent:
kafkaSource -> memoryChannel -> loggerSink

    创建配置文件 flume-kafka-file.conf
vi job/flume-kafka-file.conf
    编写配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# kafka source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = master:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id

# console sink
a1.sinks.k1.type = logger

# memory channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
    启动flume
bin/flume-ng agent -n a1 -c conf/ -f job/flume-kafka-file.conf -Dflume.root.logger=INFO,console
    启动kafka生产者
bin/kafka-console-producer.sh --broker-list master:9092 --topic first
    测试

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775796.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号