当kafkasource和kafkasink一起使用时, 传输到kafkasink的数据,被传到了kafkasource。
即陷入死循环,从 kafkasource 读取的数据被flume 重新传到kafkasource。
当flume从kafka里读取的数据时,消息上会有个event headers结构。在events headers中会自动带上,topic:‘topic名称’。
如:
而kafka sink的 中 allowTopicOveride参数默认为true,即会使用event headers中的topic覆盖我们在kafka sink 配置的topic。
所以flume在输出的时候,会优先使用从event header中读取到的topic,其次才是读取 Sink端配置的topic。
导致flume在输出的时候kafka sink的topic被覆盖,数据被输出到kafka source的topic中了。
如果kafka sink的topic是固定的,已知的。可以将allowTopicOverride参数设置为false,禁止header中的topic覆盖sink配置的topic值
sink 配置设置:
a1.sinks.kafkaSink.allowTopicOverride = false
配置文件完整代码
#agent 的名称 #指定source组件,channel组件和sink组件的名称 # Name the components on this agent agent.sources = kafkaSource agent.channels = fileChannl agent.sinks = kafkaSink # 把组件连接起来 agent.sources.kafkaSource.channels = fileChannl agent.sinks.kafkaSink.channel = fileChannl #配置source组件 agent.sources.kafkaSource.type= org.apache.flume.source.kafka.KafkaSource agent.sources.kafkaSource.batchSize= 1000 agent.sources.kafkaSource.batchDurationMillis = 1000 agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092 agent.sources.kafkaSource.kafka.topics = all_type_data_r2p40_2 agent.sources.kafkaSource.kafka.consumer.group.id = flume_con_id_1 #配置channel组件 agent.channels.fileChannl.type = memory agent.channels.fileChannl.capacity=2048 agent.channels.fileChannl.transactionCapacity=1000 # 配置sink组件 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink # 指定topic名称 agent.sinks.kafkaSink.kafka.topic = test # 指定kafka地址,多个节点地址使用逗号分割 agent.sinks.kafkaSink.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092 agent.sinks.kafkaSink.allowTopicOverride = false方式2
当kafka sink中的topic是未知的,动态的。即可能需要根据消息的内容,将消息分发到不同的topic中时,可以使用拦截器覆盖 header中的topic值。
使用正则表达式拦截器,抽取消息正文内容的值,将其覆盖到header 的topic中。
#配置拦截器 # 定义拦截器,抽取数据 agent.sources.kafkaSource.interceptors = i2 i1 # 设置拦截器类型,i1为正则表达式, agent.sources.kafkaSource.interceptors.i1.type = regex_extractor # 配置指定的数据,这样设置会在数据的header中增加topic=test agent.sources.kafkaSource.interceptors.i1.regex = "topicName":"(\w+)" agent.sources.kafkaSource.interceptors.i1.serializers = s1 agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = topic # 避免数据中没有topicName字段,给这些数据赋一个默认topic【注意:这个拦截器必须设置】 agent.sources.kafkaSource.interceptors.i2.type = static agent.sources.kafkaSource.interceptors.i2.key = topic agent.sources.kafkaSource.interceptors.i2.preserveExisting = false agent.sources.kafkaSource.interceptors.i2.value = test
完整配置文件
#agent 的名称 #指定source组件,channel组件和sink组件的名称 # Name the components on this agent agent.sources = kafkaSource agent.channels = fileChannl agent.sinks = kafkaSink # 把组件连接起来 agent.sources.kafkaSource.channels = fileChannl agent.sinks.kafkaSink.channel = fileChannl #配置source组件 agent.sources.kafkaSource.type= org.apache.flume.source.kafka.KafkaSource agent.sources.kafkaSource.batchSize= 1000 agent.sources.kafkaSource.batchDurationMillis = 1000 agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092 agent.sources.kafkaSource.kafka.topics = all_type_data_r2p40_2 agent.sources.kafkaSource.kafka.consumer.group.id = flume_con_id_1 #配置拦截器 # 定义拦截器,抽取数据 agent.sources.kafkaSource.interceptors = i2 i1 # 设置拦截器类型,i1为正则表达式, agent.sources.kafkaSource.interceptors.i1.type = regex_extractor # 配置指定的数据,这样设置会在数据的header中增加topic=test agent.sources.kafkaSource.interceptors.i1.regex = "topicName":"(\w+)" agent.sources.kafkaSource.interceptors.i1.serializers = s1 agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = topic # 避免数据中没有topicName字段,给这些数据赋一个默认topic【注意:这个拦截器必须设置】 agent.sources.kafkaSource.interceptors.i2.type = static agent.sources.kafkaSource.interceptors.i2.key = topic agent.sources.kafkaSource.interceptors.i2.preserveExisting = false agent.sources.kafkaSource.interceptors.i2.value = test #配置channel组件 agent.channels.fileChannl.type = file agent.channels.fileChannl.checkpointDir = /data/filechannle_data/all_type_data/checkpoint agent.channels.kafka2HdfsShow.dataDirs = /data/filechannle_data/all_type_data/data # 配置sink组件 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink # 指定topic名称 agent.sinks.kafkaSink.kafka.topic = default # 指定kafka地址,多个节点地址使用逗号分割 agent.sinks.kafkaSink.kafka.bootstrap.servers =bigdata01:9092,bigdata02:9092,bigdata03:9092
若配置不生效,请注意参数是否写错,如参数大小写不对等。



