栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka producer拦截器与consumer拦截器(interceptor)

Kafka producer拦截器与consumer拦截器(interceptor)

1. producer 拦截器(interceptor) 1.1 介绍

Producer 的Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。Producer允许指定多个Interceptor按照指定顺序作用于一条消 息从而形成一个拦截链(interceptor chain)。

自定义的拦截器(interceptor)需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口,接口定义如下:

package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor extends Configurable {
    
    
    ProducerRecord onSend(ProducerRecord record);

    
    void onAcknowledgement(Recordmetadata metadata, Exception exception);
    
    void close();
}
  • onSend() : 方法封装进KafkaProducer.send()方法中,方法会在消息发送之前被调用,用户可以在该方法中对消息做任
    何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算;
  • onAcknowledgement(): 当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时调用此方法。KafkaProducer.send()异步发送有回调通知 callback, onAcknowledgement 的调用要早于 callback 的调用。
  • close(): 关闭Interceptor,主要用于执行一些资源清理工作。

如果指定了多个Interceptor,则Producer将按照指定顺序调用它们,如果interceptor出现异常Producer仅仅是捕获每个 Interceptor抛出的异常记录到错误日志中而非在向上传递。

1.2 案例

实现两个拦截器(interceptor),组成拦截链。第一个拦截器在消息发送前,给消息添加header。第二个拦截器统计消息的发送成功数和失败数。

PS:其实这两个功能可以放在一个interceptor中,这里仅仅是为了演示多个interceptor。

定义拦截器

拦截器1:

public class MyInterceptor implements ProducerInterceptor {

    
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        //消息的主题
        String topic = record.topic();
        Integer partition = record.partition();
        Integer key = record.key();
        String value = record.value();

        Headers headers = record.headers();
        //给header添加时间戳
        String stamp = System.currentTimeMillis()+"";
        headers.add("timestamp",stamp.getBytes(StandardCharsets.UTF_8));
        ProducerRecord resultRecord=
                  new ProducerRecord<>(topic,partition,key,value,headers);
        return resultRecord;
    }

    
    @Override
    public void onAcknowledgement(Recordmetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map configs) {

    }
}

拦截器2:

public class MyInterceptor02 implements ProducerInterceptor {

    private int errorNum=0;
    private int successNum=0;
    
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        return record;
    }

    
    @Override
    public void onAcknowledgement(Recordmetadata metadata, Exception exception) {
         if (exception==null){
             successNum++;
         }else {
            errorNum++;
         }
    }

    @Override
    public void close() {

        System.out.println("消息发送成功数: " + successNum);
        System.out.println("消息发送失败数: " + errorNum);
    }

    @Override
    public void configure(Map configs) {

    }
}
生产者

configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"拦截器类全路径");,多个拦截器使用,分割

public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置value的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        //添加拦截器
        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyInterceptor,com.warybee.interceptor.MyInterceptor02");
        KafkaProducer kafkaProducer=new KafkaProducer(configs);
        //发送100条消息
        for (int i = 0; i < 100; i++) {
            ProducerRecord producerRecord=new ProducerRecord<>
                    (       "test_topic_1",
                            0,
                            i,
                            "test topic msg "+i);
            //消息的异步确认
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(Recordmetadata recordmetadata, Exception exception) {
                    if (exception==null){
                        System.out.println("消息的主题:"+recordmetadata.topic());
                        System.out.println("消息的分区:"+recordmetadata.partition());
                        System.out.println("消息的偏移量:"+recordmetadata.offset());
                    }else {
                        System.out.println("发送消息异常");
                    }
                }
            });

        }
        // 关闭生产者
        kafkaProducer.close();
    }
}
2 Consumer拦截器(interceptor) 2.1.介绍

消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。

  • ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
  • 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
  • ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
  • 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
  • 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。

自定义的拦截器(interceptor)需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口,接口定义如下:

public interface ConsumerInterceptor extends Configurable, AutoCloseable {

    
    ConsumerRecords onConsume(ConsumerRecords records);

    
    void onCommit(Map offsets);

    
    void close();
}

方法说明:

  • onConsume 该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
  • onCommit 当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
2.2 案例 定义拦截器
public class MyConsumerInterceptor implements ConsumerInterceptor {

    @Override
    public ConsumerRecords onConsume(ConsumerRecords records) {
        //在这里可以对接收到的消息进行修改
        //如不做处理,直接返回即可
        return records;
    }

    @Override
    public void onCommit(Map offsets) {
        offsets.forEach((tp,offsetAndmetadata) -> {
            System.out.println(tp+" : "+offsetAndmetadata.offset());
        });
    }

    @Override
    public void close() {

    }

    
    @Override
    public void configure(Map configs) {
        configs.forEach((k, v) -> {
            System.out.println(k + "t" + v);
        });
    }
}
消费者

在消费者客户端配置中增加如下配置

如果有多个拦截器,用,分割即可

configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
public class KafkaConsumerDemo {

    public static void main(String[] args) {
        Map configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
        //创建消费者对象
        KafkaConsumer consumer = new KafkaConsumer(configs);

        List topics = new ArrayList<>();
        topics.add("test_topic_1");
        //消费者订阅主题
        consumer.subscribe(topics);
        while (true){
            //批量拉取主题消息,每3秒拉取一次
            ConsumerRecords records = consumer.poll(3000);
            //变量消息
            for (ConsumerRecord record : records) {
                System.out.println(record.topic() + "t"
                        + record.partition() + "t"
                        + record.offset() + "t"
                        + record.key() + "t"
                        + record.value());

            }
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671288.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号