栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flume同时使用kafka source和kafka sink时数据会sink到source中的topic

flume同时使用kafka source和kafka sink时数据会sink到source中的topic

最近想要使用flume加拦截器对kafka数据进行处理,但是使用时发现sink中配置的topic不起作用数据sink到source中配置的topic,然后就开始了循环,写进去读出来再写进去,查了老半天查到了这篇文章详细解释了原因,感谢这位作者。总结一下就是source读取数据的时候会将source的topic写进event的header中,sink的时候会从这个header中读取topic。

这篇文章里没有详细的拦截器代码,下面附上我写的拦截器以及配置文件以供参考:

配置文件:

sync.sources = r1
sync.channels = c1
sync.sinks = k1

# interceptor
sync.sources.r1.interceptors = i1
sync.sources.r1.interceptors.i1.type = AddMarkInterceptor$Builder
# preserveExisting默认值是TRUE表示topic不改变,一定要写成false
sync.sources.r1.interceptors.i1.preserveExisting = false
# 自定义参数
# 修改header中的哪个key
sync.sources.r1.interceptors.i1.headerKey = topic
# 定义将数据发送到那个topic
sync.sources.r1.interceptors.i1.headerKeyValue = test_sink
# 数据处理相关的参数
sync.sources.r1.interceptors.i1.dataMarkName = mark
sync.sources.r1.interceptors.i1.dataMarkValue = hongan

# For each one of the sources, the type is defined
sync.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
sync.sources.r1.channels = c1
sync.sources.r1.batchSize = 100
sync.sources.r1.batchDurationMillis = 2000
sync.sources.r1.kafka.bootstrap.servers = sd-kafka001:9092
sync.sources.r1.kafka.topics = test
sync.sources.r1.kafka.consumer.group.id = test_001

# The channel can be defined as follows.
sync.channels = c1
sync.channels.c1.type = memory
sync.channels.c1.capacity = 10000
sync.channels.c1.transactionCapacity = 10000
sync.channels.c1.byteCapacityBufferPercentage = 20
sync.channels.c1.byteCapacity = 800000

# Each sink's type must be defined
sync.sinks.k1.channel = c1
sync.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 拦截器中已经定义sinktopic,这里写了也不生效
# sync.sinks.k1.kafka.topic = txy_test_sink
sync.sinks.k1.kafka.bootstrap.servers = sd-kafka001:9092
sync.sinks.k1.kafka.flumeBatchSize = 20
sync.sinks.k1.kafka.producer.acks = 1
sync.sinks.k1.kafka.producer.linger.ms = 1

拦截器代码:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;


public class AddMarkInterceptor implements Interceptor {
    //自定义参数
    private static String headerKey = "";
    private static String headerKeyValue = "";
    private static String dataMarkName = "";
    private static String dataMarkValue = "";

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //获取header,header的形式({topic=test, partition=0, offset=33, timestamp=1646794542175})
        Map headers = event.getHeaders();
        //将topic更改为自己定义的topic
        headers.put(headerKey,headerKeyValue);
        //改好的header要放回去
        event.setHeaders(headers);
        //获取event body 进行加工
        String source = new String(event.getBody(), Charsets.UTF_8);
        JSonObject sourceObj = JSONObject.parseObject(source);
        sourceObj.put(dataMarkName,dataMarkValue);
        String msg = JSON.toJSonString(sourceObj, SerializerFeature.WriteMapNullValue);
        //加工好的body要放回去
        event.setBody(msg.getBytes());
        return event;
    }

    @Override
    public List intercept(List list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            //此处是本类的对象,注意名称要一致
            return new AddMarkInterceptor();
        }

        @Override
        public void configure(Context context) {
            //拦截器是通过builder获取配置中的参数据
            //配置builder获取config文件中的参数,此处的getString内的名称就是配置文件中sync.sources.r1.interceptors.i1.后的名称
            headerKey = context.getString("headerKey");
            headerKeyValue = context.getString("headerKeyValue");
            dataMarkName = context.getString("dataMarkName");
            dataMarkValue = context.getString("dataMarkValue");
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758071.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号