栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

编码技巧——@KafkaListener的使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

编码技巧——@KafkaListener的使用

最近在接手某个项目代码时,发现关于Kafka的consumer相关的代码写的很乱,consumer中写了大量的配置的代码,并且手动的拉取消息,并开启线程消费,不够优雅;

理想的做法是单独维护kafka的consumer配置,在定义consumer的bean时,指定topic和group,仅实现消费逻辑;

从kafka-clients的2.2.4版本开始,可以直接使用@KafkaListener注解来标记消费者,注解的属性将覆盖在消费者工厂中配置的具有相同名称的所有属性,下面介绍使用方法;

2.3.1
1.3.9.RELEASE



    org.apache.kafka
    kafka-clients
    ${kafka.client.version}


    org.springframework.kafka
    spring-kafka
    ${spring-kafka.version}

1. 定义消费者工厂ConcurrentKafkaListenerContainerFactory
@EnableKafka
@Configuration
public class KafkaConfig {

    
    public Map consumerConfigs(String kafkaServerUrls) {
        Map props = Maps.newHashMap();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrls);
        // 自动提交(按周期)已消费offset 批量消费下设置false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // session超时时间 broker提出consumer的心跳间隔
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ConfigManager.getString("kafka.session.timeout.ms", "60000"));
        // 最大消息拉取条数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ConfigManager.getString("kafka.max.poll.records", "500"));
        // 序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 默认的groupId 未配置则在注解中声明
        props.put(ConsumerConfig.GROUP_ID_CONFIG, ConfigManager.getString(BizConstants.KAFKA_GROUP_ID));
        // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 加密
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("security.protocol", "SASL_PLAINTEXT");
        // 账号密码
        props.put("sasl.jaas.config", ConfigManager.getString(BizConstants.KAFKA_SASL_JAAS_CONFIG));
        return props;
    }

    
    public ConsumerFactory initConsumerFactory(String kafkaServerUrls) {
        return new DefaultKafkaConsumerFactory(consumerConfigs(kafkaServerUrls));
    }

    public KafkaListenerContainerFactory> initKafkaListenerContainerFactory(String kafkaServerUrls) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(initConsumerFactory(kafkaServerUrls));
        factory.setConcurrency(ConfigManager.getInteger("kafka.concurrency", 1));
        // listener类型为批量batch类型
        factory.setBatchListener(true);
        // offset提交模式为batch
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
        factory.getContainerProperties().setPollTimeout(ConfigManager.getInteger("kafka.poll.timeout", 3000));
        return factory;
    }

    
    @Bean(name = "moliKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory> moliKafkaListenerContainerFactory() {
        return initKafkaListenerContainerFactory(ConfigManager.get("gameCenter.brokers", "moli-kafka.prd.lan:9092"));
    }

    
    @Bean(name = "xiangrikuiKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory> xiangrikuiKafkaListenerContainerFactory() {
        return initKafkaListenerContainerFactory(ConfigManager.get("minigame.brokers", "xiangrikui-kafka.prd.lan:9092"));
    }

}

2. 定义消费者KafkaListener
@Slf4j
@Component
public class GameCenterPushKafkaListener {

    @Autowired
    private PushFrequencyService pushFrequencyService;

    @Autowired
    @Qualifier("kafkaThreadPool")
    private ThreadPoolTaskExecutor executor;

    @KafkaListener(
            containerFactory = "moliKafkaListenerContainerFactory",
            topics = "${gameCenter.topic}",
            groupId = "${gameCenter.consumer.group}"
    )
    public void onMessage(List> records, Acknowledgment ack) {
        if (CollectionUtils.isNotEmpty(records)) {
            records.forEach(this::processSingleRecord);
        }
        try {
            ack.acknowledge();
        } catch (Exception e) {
            log.error("GameCenterPushKafkaListener_kafka_ack_error.", e);
        }
    }

    private void processSingleRecord(ConsumerRecord record) {
        executor.submit(() -> {
            pushFrequencyService.recordUserPushCount(record);
        });
    }

3. @KafkaListener注解属性说明
public @interface KafkaListener {

	
	String id() default "";

	
	String containerFactory() default "";

	
	String[] topics() default {};

	
	String topicPattern() default "";

	
	TopicPartition[] topicPartitions() default {};

	
	String errorHandler() default "";

	
	String groupId() default "";

	
	boolean idIsGroup() default true;

}

3.1 id 监听器的id

(1)监听器id属性,可用来命名消费者线程,如下:

填写id = "consumer-id5",线程名如下:

    2022-8-8 17:27:30 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费

没有填写ID,线程名如下:

    2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

(2)需要注意,在相同容器中,监听器ID不能重复

否则会报错:

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id...

(3)会覆盖消费者工厂的消费组GroupId

例如,消费者工厂属性配置了消费组kafka.consumer.group-id=BASE-DEMO,它是该容器中的默认消费组;但是如果设置了 @KafkaListener(id = "consumer-id7"),那么当前消费者的消费组就是consumer-id7;当然如果你不想要他作为groupId的话可以设置属性idIsGroup = false,那么还是会使用默认的GroupId;

(4)如果配置了属性groupId,则groupId优先级最高

 @KafkaListener(id = "consumer-id5", idIsGroup = false, topics = "My_TOPIC", groupId = "groupId-test")

3.2 监听器的topic

关于topic的配置有3种,topics、topicPattern、topicPartitions 三选一;

(1)topics属性,这种方式最简单,可以指定多个topic

    @KafkaListener(
			topics = {"SHI_TOPIC3","SHI_TOPIC4"},
            groupId = "${gameCenter.consumer.group}"
    )

(2)topicPattern,支持表达式

 
    @KafkaListener(id = "pullPatternMsg", topicPattern = "rx_.*_.*_thing.*", concurrency = "1")
    public void pullPatternMsg(@Payload String data,
                               @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                               @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ByteBuffer key,
                               Acknowledgment ack, //手动提交offset
                               @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                               @Header(KafkaHeaders.OFFSET) long offSet,
                             
                               Consumer consumer //消费者
    )

(3)topicPartitions显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
          partitionOffsets = @PartitionOffset(partition = "1", 
          initialOffset = "100"))
        })
public void listen(ConsumerRecord record) {
    ...
}

上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

3.3 errorHandler异常处理器

可以在consumer中手动try/catch,也可以实现KafkaListenerErrorHandler复用异常处理逻辑;

@Component("kafkaErrorHandler")
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message message, ListenerExecutionFailedException exception) {
        return null;
    }
 
    @Override
    public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) {
        //do someting
        return null;
    }
}

调用的时候errorHandler的值填写beanName,如下:

    @KafkaListener(
            containerFactory = "moliKafkaListenerContainerFactory",
            topics = "${gameCenter.topic}",
            groupId = "${gameCenter.consumer.group}",
            errorHandler = "kafkaErrorHandler"
    )

3.4 containerFactory监听器工厂

kafka的配置就放在这里;

    @Bean(name = "moliKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory> moliKafkaListenerContainerFactory() {
        return initKafkaListenerContainerFactory(ConfigManager.get("gameCenter.brokers", "moli-kafka.prd.lan:9092"));
    }
    @KafkaListener(
            containerFactory = "moliKafkaListenerContainerFactory",
            topics = "${gameCenter.topic}",
            groupId = "${gameCenter.consumer.group}"
    )

参考:

@KafkaListener详解与使用_python_石臻臻的杂货铺-DevPress官方社区

kafka之@KafkaListener

Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1040443.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号