最近在接手某个项目代码时,发现关于Kafka的consumer相关的代码写的很乱,consumer中写了大量的配置的代码,并且手动的拉取消息,并开启线程消费,不够优雅;
理想的做法是单独维护kafka的consumer配置,在定义consumer的bean时,指定topic和group,仅实现消费逻辑;
从kafka-clients的2.2.4版本开始,可以直接使用@KafkaListener注解来标记消费者,注解的属性将覆盖在消费者工厂中配置的具有相同名称的所有属性,下面介绍使用方法;
1. 定义消费者工厂ConcurrentKafkaListenerContainerFactory2.3.1 1.3.9.RELEASE org.apache.kafka kafka-clients${kafka.client.version} org.springframework.kafka spring-kafka${spring-kafka.version}
@EnableKafka
@Configuration
public class KafkaConfig {
public Map consumerConfigs(String kafkaServerUrls) {
Map props = Maps.newHashMap();
// broker server地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrls);
// 自动提交(按周期)已消费offset 批量消费下设置false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// session超时时间 broker提出consumer的心跳间隔
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ConfigManager.getString("kafka.session.timeout.ms", "60000"));
// 最大消息拉取条数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ConfigManager.getString("kafka.max.poll.records", "500"));
// 序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 默认的groupId 未配置则在注解中声明
props.put(ConsumerConfig.GROUP_ID_CONFIG, ConfigManager.getString(BizConstants.KAFKA_GROUP_ID));
// 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 加密
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("security.protocol", "SASL_PLAINTEXT");
// 账号密码
props.put("sasl.jaas.config", ConfigManager.getString(BizConstants.KAFKA_SASL_JAAS_CONFIG));
return props;
}
public ConsumerFactory initConsumerFactory(String kafkaServerUrls) {
return new DefaultKafkaConsumerFactory(consumerConfigs(kafkaServerUrls));
}
public KafkaListenerContainerFactory> initKafkaListenerContainerFactory(String kafkaServerUrls) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(initConsumerFactory(kafkaServerUrls));
factory.setConcurrency(ConfigManager.getInteger("kafka.concurrency", 1));
// listener类型为批量batch类型
factory.setBatchListener(true);
// offset提交模式为batch
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
factory.getContainerProperties().setPollTimeout(ConfigManager.getInteger("kafka.poll.timeout", 3000));
return factory;
}
@Bean(name = "moliKafkaListenerContainerFactory")
public KafkaListenerContainerFactory> moliKafkaListenerContainerFactory() {
return initKafkaListenerContainerFactory(ConfigManager.get("gameCenter.brokers", "moli-kafka.prd.lan:9092"));
}
@Bean(name = "xiangrikuiKafkaListenerContainerFactory")
public KafkaListenerContainerFactory> xiangrikuiKafkaListenerContainerFactory() {
return initKafkaListenerContainerFactory(ConfigManager.get("minigame.brokers", "xiangrikui-kafka.prd.lan:9092"));
}
}
2. 定义消费者KafkaListener
@Slf4j
@Component
public class GameCenterPushKafkaListener {
@Autowired
private PushFrequencyService pushFrequencyService;
@Autowired
@Qualifier("kafkaThreadPool")
private ThreadPoolTaskExecutor executor;
@KafkaListener(
containerFactory = "moliKafkaListenerContainerFactory",
topics = "${gameCenter.topic}",
groupId = "${gameCenter.consumer.group}"
)
public void onMessage(List> records, Acknowledgment ack) {
if (CollectionUtils.isNotEmpty(records)) {
records.forEach(this::processSingleRecord);
}
try {
ack.acknowledge();
} catch (Exception e) {
log.error("GameCenterPushKafkaListener_kafka_ack_error.", e);
}
}
private void processSingleRecord(ConsumerRecord record) {
executor.submit(() -> {
pushFrequencyService.recordUserPushCount(record);
});
}
3. @KafkaListener注解属性说明
public @interface KafkaListener {
String id() default "";
String containerFactory() default "";
String[] topics() default {};
String topicPattern() default "";
TopicPartition[] topicPartitions() default {};
String errorHandler() default "";
String groupId() default "";
boolean idIsGroup() default true;
}
3.1 id 监听器的id
(1)监听器id属性,可用来命名消费者线程,如下:
填写id = "consumer-id5",线程名如下:
2022-8-8 17:27:30 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费
没有填写ID,线程名如下:
2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7
(2)需要注意,在相同容器中,监听器ID不能重复
否则会报错:
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id...
(3)会覆盖消费者工厂的消费组GroupId
例如,消费者工厂属性配置了消费组kafka.consumer.group-id=BASE-DEMO,它是该容器中的默认消费组;但是如果设置了 @KafkaListener(id = "consumer-id7"),那么当前消费者的消费组就是consumer-id7;当然如果你不想要他作为groupId的话可以设置属性idIsGroup = false,那么还是会使用默认的GroupId;
(4)如果配置了属性groupId,则groupId优先级最高
@KafkaListener(id = "consumer-id5", idIsGroup = false, topics = "My_TOPIC", groupId = "groupId-test")
3.2 监听器的topic
关于topic的配置有3种,topics、topicPattern、topicPartitions 三选一;
(1)topics属性,这种方式最简单,可以指定多个topic
@KafkaListener(
topics = {"SHI_TOPIC3","SHI_TOPIC4"},
groupId = "${gameCenter.consumer.group}"
)
(2)topicPattern,支持表达式
@KafkaListener(id = "pullPatternMsg", topicPattern = "rx_.*_.*_thing.*", concurrency = "1")
public void pullPatternMsg(@Payload String data,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ByteBuffer key,
Acknowledgment ack, //手动提交offset
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offSet,
Consumer, ?> consumer //消费者
)
(3)topicPartitions显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",
initialOffset = "100"))
})
public void listen(ConsumerRecord, ?> record) {
...
}
上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;
3.3 errorHandler异常处理器
可以在consumer中手动try/catch,也可以实现KafkaListenerErrorHandler复用异常处理逻辑;
@Component("kafkaErrorHandler")
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message> message, ListenerExecutionFailedException exception) {
return null;
}
@Override
public Object handleError(Message> message, ListenerExecutionFailedException exception, Consumer, ?> consumer) {
//do someting
return null;
}
}
调用的时候errorHandler的值填写beanName,如下:
@KafkaListener(
containerFactory = "moliKafkaListenerContainerFactory",
topics = "${gameCenter.topic}",
groupId = "${gameCenter.consumer.group}",
errorHandler = "kafkaErrorHandler"
)
3.4 containerFactory监听器工厂
kafka的配置就放在这里;
@Bean(name = "moliKafkaListenerContainerFactory")
public KafkaListenerContainerFactory> moliKafkaListenerContainerFactory() {
return initKafkaListenerContainerFactory(ConfigManager.get("gameCenter.brokers", "moli-kafka.prd.lan:9092"));
}
@KafkaListener(
containerFactory = "moliKafkaListenerContainerFactory",
topics = "${gameCenter.topic}",
groupId = "${gameCenter.consumer.group}"
)
参考:
@KafkaListener详解与使用_python_石臻臻的杂货铺-DevPress官方社区
kafka之@KafkaListener
Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例



