Kafka消费Flume推送的数据Kafka生产数据推送到Flume
Kafka消费Flume推送的数据读取kafka数据只需更换KafkaSource
推送数据只需更换为KafkaSink
agent组件:
taildirSource -> memoryChannel -> kafkaSink
- 在flume/job目录下创建文件flume-taildir-kafka.conf
vim job/flume-taildir-kafka.conf
- 添加配置内容
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/flume/tail_dir.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/flume/files/.*file.* # kafka sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = master:9092 a1.sinks.k1.kafka.topic = first # 一批次拉取多少条日志 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # memory channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 启动kafka消费者
bin/kafka-console-consumer.sh --topic first --bootstrap-server master:9092
- 启动flume
bin/flume-ng agent -n a1 -c conf/ -f job/flume-taildir-kafka.conf
- 在flume下的files文件中创建file3.txt并写入内容
vi files/text.3 # 写入以下内容 hello flume kafka 23333
新建文件结果
追加内容结果
读取kafka数据输出到控制台
agent:
kafkaSource -> memoryChannel -> loggerSink
- 创建配置文件 flume-kafka-file.conf
vi job/flume-kafka-file.conf
- 编写配置文件
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # kafka source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 50 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers = master:9092 a1.sources.r1.kafka.topics = first a1.sources.r1.kafka.consumer.group.id = custom.g.id # console sink a1.sinks.k1.type = logger # memory channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 启动flume
bin/flume-ng agent -n a1 -c conf/ -f job/flume-kafka-file.conf -Dflume.root.logger=INFO,console
- 启动kafka生产者
bin/kafka-console-producer.sh --broker-list master:9092 --topic first
- 测试



