Producer 的Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。Producer允许指定多个Interceptor按照指定顺序作用于一条消 息从而形成一个拦截链(interceptor chain)。
自定义的拦截器(interceptor)需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口,接口定义如下:
package org.apache.kafka.clients.producer; import org.apache.kafka.common.Configurable; public interface ProducerInterceptorextends Configurable { ProducerRecord onSend(ProducerRecord record); void onAcknowledgement(Recordmetadata metadata, Exception exception); void close(); }
- onSend() : 方法封装进KafkaProducer.send()方法中,方法会在消息发送之前被调用,用户可以在该方法中对消息做任
何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算; - onAcknowledgement(): 当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时调用此方法。KafkaProducer.send()异步发送有回调通知 callback, onAcknowledgement 的调用要早于 callback 的调用。
- close(): 关闭Interceptor,主要用于执行一些资源清理工作。
如果指定了多个Interceptor,则Producer将按照指定顺序调用它们,如果interceptor出现异常Producer仅仅是捕获每个 Interceptor抛出的异常记录到错误日志中而非在向上传递。
1.2 案例实现两个拦截器(interceptor),组成拦截链。第一个拦截器在消息发送前,给消息添加header。第二个拦截器统计消息的发送成功数和失败数。
PS:其实这两个功能可以放在一个interceptor中,这里仅仅是为了演示多个interceptor。
定义拦截器拦截器1:
public class MyInterceptor implements ProducerInterceptor{ @Override public ProducerRecord onSend(ProducerRecord record) { //消息的主题 String topic = record.topic(); Integer partition = record.partition(); Integer key = record.key(); String value = record.value(); Headers headers = record.headers(); //给header添加时间戳 String stamp = System.currentTimeMillis()+""; headers.add("timestamp",stamp.getBytes(StandardCharsets.UTF_8)); ProducerRecord resultRecord= new ProducerRecord<>(topic,partition,key,value,headers); return resultRecord; } @Override public void onAcknowledgement(Recordmetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map configs) { } }
拦截器2:
public class MyInterceptor02 implements ProducerInterceptor生产者{ private int errorNum=0; private int successNum=0; @Override public ProducerRecord onSend(ProducerRecord record) { return record; } @Override public void onAcknowledgement(Recordmetadata metadata, Exception exception) { if (exception==null){ successNum++; }else { errorNum++; } } @Override public void close() { System.out.println("消息发送成功数: " + successNum); System.out.println("消息发送失败数: " + errorNum); } @Override public void configure(Map configs) { } }
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"拦截器类全路径");,多个拦截器使用,分割
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Map configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
// 设置key的序列化类
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
// 设置value的序列化类
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.ACKS_CONFIG,"all");
//添加拦截器
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyInterceptor,com.warybee.interceptor.MyInterceptor02");
KafkaProducer kafkaProducer=new KafkaProducer(configs);
//发送100条消息
for (int i = 0; i < 100; i++) {
ProducerRecord producerRecord=new ProducerRecord<>
( "test_topic_1",
0,
i,
"test topic msg "+i);
//消息的异步确认
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(Recordmetadata recordmetadata, Exception exception) {
if (exception==null){
System.out.println("消息的主题:"+recordmetadata.topic());
System.out.println("消息的分区:"+recordmetadata.partition());
System.out.println("消息的偏移量:"+recordmetadata.offset());
}else {
System.out.println("发送消息异常");
}
}
});
}
// 关闭生产者
kafkaProducer.close();
}
}
2 Consumer拦截器(interceptor)
2.1.介绍
消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。
- ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
- 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
- ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
- 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
- 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。
自定义的拦截器(interceptor)需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口,接口定义如下:
public interface ConsumerInterceptorextends Configurable, AutoCloseable { ConsumerRecords onConsume(ConsumerRecords records); void onCommit(Map offsets); void close(); }
方法说明:
- onConsume 该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
- onCommit 当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
public class MyConsumerInterceptor implements ConsumerInterceptor消费者{ @Override public ConsumerRecords onConsume(ConsumerRecords records) { //在这里可以对接收到的消息进行修改 //如不做处理,直接返回即可 return records; } @Override public void onCommit(Map offsets) { offsets.forEach((tp,offsetAndmetadata) -> { System.out.println(tp+" : "+offsetAndmetadata.offset()); }); } @Override public void close() { } @Override public void configure(Map configs) { configs.forEach((k, v) -> { System.out.println(k + "t" + v); }); } }
在消费者客户端配置中增加如下配置
如果有多个拦截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
public class KafkaConsumerDemo {
public static void main(String[] args) {
Map configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
//KEY反序列化类
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//value反序列化类
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
//创建消费者对象
KafkaConsumer consumer = new KafkaConsumer(configs);
List topics = new ArrayList<>();
topics.add("test_topic_1");
//消费者订阅主题
consumer.subscribe(topics);
while (true){
//批量拉取主题消息,每3秒拉取一次
ConsumerRecords records = consumer.poll(3000);
//变量消息
for (ConsumerRecord record : records) {
System.out.println(record.topic() + "t"
+ record.partition() + "t"
+ record.offset() + "t"
+ record.key() + "t"
+ record.value());
}
}
}
}



