创建文件flumeexec.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source组件:r1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/nana/text.log # 描述和配置sink组件:k1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = flumetopic a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092,hadoop2:9092,hadoop3:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type = snappy # 描述和配置channel组件,此处使用是内存缓存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
启动flume:
bin/flume-ng agent -c conf -f conf/flumeexec.conf -n a1 -=Dflume.root.logger=INFO,console
启动成功,并且自动创建topic
消费kafka中的数据:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic flumeTopic
创建文件flumetaildir.conf
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=TAILDIR # 元数据位置 a1.sources.r1.positionFile = /usr/local/src/flume/taildir_position.json # 监控的目录 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /usr/local/src/flume/data/.*log a1.sources.r1.fileHeader = true a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092 a1.sinks.k1.kafka.topic = testTopic3 a1.sinks.k1.kafka.producer.compression.type = snappy a1.channels.c1.type=file # 数据存放路径 a1.channels.c1.checkpointDir = /usr/local/src/flume/filechannle/checkpointDir # 检查点路径 a1.channels.c1.dataDirs = /usr/local/src/flume/filechannle/dataDirs # channel中最多缓存多少 a1.channels.c1.capacity=1000 # channel一次最多吐给sink多少 a1.channels.c1.transactionCapacity=100 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
启动flume:
bin/flume-ng agent -c conf -f conf/flumetaildir.conf -n a1 -=Dflume.root.logger=INFO,console
启动成功,并且自动创建topic
消费kafka中的数据:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic testTopic3
创建文件flumenc.conf
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=8888 a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic=testnc a1.sinks.k1.kafka.bootstrap.servers = hadoop1:9092,hadoop2:9092,hadoop3:9092 a1.sinks.k1.kafka.flumeBatchSize=20 a1.sinks.k1.kafka.producer.acks=1 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
启动flume:
bin/flume-ng agent -c conf -f conf/flumenc.conf -n a1 -=Dflume.root.logger=INFO,console
启动成功,并且自动创建topic
消费kafka中的数据:
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic testnc



