最近想要使用flume加拦截器对kafka数据进行处理,但是使用时发现sink中配置的topic不起作用数据sink到source中配置的topic,然后就开始了循环,写进去读出来再写进去,查了老半天查到了这篇文章详细解释了原因,感谢这位作者。总结一下就是source读取数据的时候会将source的topic写进event的header中,sink的时候会从这个header中读取topic。
这篇文章里没有详细的拦截器代码,下面附上我写的拦截器以及配置文件以供参考:
配置文件:
sync.sources = r1 sync.channels = c1 sync.sinks = k1 # interceptor sync.sources.r1.interceptors = i1 sync.sources.r1.interceptors.i1.type = AddMarkInterceptor$Builder # preserveExisting默认值是TRUE表示topic不改变,一定要写成false sync.sources.r1.interceptors.i1.preserveExisting = false # 自定义参数 # 修改header中的哪个key sync.sources.r1.interceptors.i1.headerKey = topic # 定义将数据发送到那个topic sync.sources.r1.interceptors.i1.headerKeyValue = test_sink # 数据处理相关的参数 sync.sources.r1.interceptors.i1.dataMarkName = mark sync.sources.r1.interceptors.i1.dataMarkValue = hongan # For each one of the sources, the type is defined sync.sources.r1.type = org.apache.flume.source.kafka.KafkaSource sync.sources.r1.channels = c1 sync.sources.r1.batchSize = 100 sync.sources.r1.batchDurationMillis = 2000 sync.sources.r1.kafka.bootstrap.servers = sd-kafka001:9092 sync.sources.r1.kafka.topics = test sync.sources.r1.kafka.consumer.group.id = test_001 # The channel can be defined as follows. sync.channels = c1 sync.channels.c1.type = memory sync.channels.c1.capacity = 10000 sync.channels.c1.transactionCapacity = 10000 sync.channels.c1.byteCapacityBufferPercentage = 20 sync.channels.c1.byteCapacity = 800000 # Each sink's type must be defined sync.sinks.k1.channel = c1 sync.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 拦截器中已经定义sinktopic,这里写了也不生效 # sync.sinks.k1.kafka.topic = txy_test_sink sync.sinks.k1.kafka.bootstrap.servers = sd-kafka001:9092 sync.sinks.k1.kafka.flumeBatchSize = 20 sync.sinks.k1.kafka.producer.acks = 1 sync.sinks.k1.kafka.producer.linger.ms = 1
拦截器代码:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
public class AddMarkInterceptor implements Interceptor {
//自定义参数
private static String headerKey = "";
private static String headerKeyValue = "";
private static String dataMarkName = "";
private static String dataMarkValue = "";
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//获取header,header的形式({topic=test, partition=0, offset=33, timestamp=1646794542175})
Map headers = event.getHeaders();
//将topic更改为自己定义的topic
headers.put(headerKey,headerKeyValue);
//改好的header要放回去
event.setHeaders(headers);
//获取event body 进行加工
String source = new String(event.getBody(), Charsets.UTF_8);
JSonObject sourceObj = JSONObject.parseObject(source);
sourceObj.put(dataMarkName,dataMarkValue);
String msg = JSON.toJSonString(sourceObj, SerializerFeature.WriteMapNullValue);
//加工好的body要放回去
event.setBody(msg.getBytes());
return event;
}
@Override
public List intercept(List list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
//此处是本类的对象,注意名称要一致
return new AddMarkInterceptor();
}
@Override
public void configure(Context context) {
//拦截器是通过builder获取配置中的参数据
//配置builder获取config文件中的参数,此处的getString内的名称就是配置文件中sync.sources.r1.interceptors.i1.后的名称
headerKey = context.getString("headerKey");
headerKeyValue = context.getString("headerKeyValue");
dataMarkName = context.getString("dataMarkName");
dataMarkValue = context.getString("dataMarkValue");
}
}
}



