网上都是消息监听,接口调用时,同一个消费组,无法使用为同一个消费者,所以需要取消订阅。
Properties properties=kafkaConfig.getProperties();
// 将参数设置到消费者参数中
KafkaConsumer consumer = new KafkaConsumer(properties);
List list=new ArrayList<>();
if(authorizeCache!=null) {
String topic=type + authorizeCache.getScheme();
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords consumerRecords= consumer.poll(Duration.ofMillis(1000));
consumerRecords.forEach(record -> {
list.add(record.value());
});
}
consumer.commitSync();//同步提交
consumer.unsubscribe();//取消订阅



