org.springframework.boot spring-boot-starter-parent2.6.6
org.springframework.cloud
spring-cloud-starter-stream-kafka
org.springframework.kafka
spring-kafka
org.springframework.cloud
spring-cloud-stream
io.projectreactor.kafka
reactor-kafka
1.3.11
关键架包reactor-kafka
2.创建监听监听Templatepackage com.kittlen.cloud.reactivekafka.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
@Component
public class ReactiveConsumerConfig {
@Bean
public ReceiverOptions kafkaReceiverOptions(@Value(value = "${kafka.consumer.topic}") String topic, KafkaProperties kafkaProperties) {
ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic));
}
@Bean
public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate(ReceiverOptions kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}
}
3.根据Template创建对应的实际监听业务
package com.kittlen.cloud.reactivekafka.consumers;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverRecord;
@Service
public class ReactiveConsumerService implements CommandLineRunner {
protected Log log = LogFactory.getLog(ReactiveConsumerService.class);
@Autowired
ReactiveKafkaConsumerTemplate requestMsgReactiveKafkaConsumerTemplate;
private Flux> dgkConsummer() {
Flux> monoFlux = requestMsgReactiveKafkaConsumerTemplate
.receiveAutoAck()
.map(cr -> handler(cr))
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
return monoFlux;
}
//返回类型根据实际需求自己进行调整
//在该方法里面如果直接抛出异常,会直接导致停止对该topic的监听
protected Mono handler(ConsumerRecord consumerRecord) {
try{
return Mono.just(true);
}catch (Exception e) {
return Mono.error(e);
}
}
@Override
public void run(String... args) {
dgkConsummer().subscribe(m -> m.subscribe());
}
}
4.监听多topic
使用同一个ReactiveKafkaConsumerTemplate
创建kafkaReceiverOptions时订阅多个topic
@Bean
public ReceiverOptions kafkaReceiverOptions(KafkaProperties kafkaProperties) {
ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Stream.of("topic1", "topic2").collect(Collectors.toList()));
}
处理消息时根据topic进行判断
protected Mono定义多个监听template,再根据template创建对应的实际监听业务handler(ConsumerRecord consumerRecord) { try{ if(consumerRecord.topic().equals("topic1")){ / return Mono.just(true); }catch (Exception e) { return Mono.error(e); } }
@Bean
public ReceiverOptions kafkaReceiverOptions1(KafkaProperties kafkaProperties) {
ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList("topic1"));
}
@Bean
public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate1(ReceiverOptions kafkaReceiverOptions1) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions1);
}
@Bean
public ReceiverOptions kafkaReceiverOptions2(KafkaProperties kafkaProperties) {
ReceiverOptions basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList("topic2"));
}
@Bean
public ReactiveKafkaConsumerTemplate reactiveKafkaConsumerTemplate2(ReceiverOptions kafkaReceiverOptions2) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions2);
}
@Resource(name = "reactiveKafkaConsumerTemplate1")
ReactiveKafkaConsumerTemplate requestMsgReactiveKafkaConsumerTemplate1;
@Resource(name = "reactiveKafkaConsumerTemplate2")
ReactiveKafkaConsumerTemplate requestMsgReactiveKafkaConsumerTemplate2;
private Flux> dgkConsummer1() {
Flux> monoFlux = requestMsgReactiveKafkaConsumerTemplate1
.receiveAutoAck()
.map(cr -> handler1(cr))
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
return monoFlux;
}
//返回类型根据实际需求自己进行调整
protected Mono handler1(ConsumerRecord consumerRecord) {
try{
return Mono.just(true);
}catch (Exception e) {
return Mono.error(e);
}
}
private Flux> dgkConsummer2() {
Flux> monoFlux = requestMsgReactiveKafkaConsumerTemplate2
.receiveAutoAck()
.map(cr -> handler2(cr))
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
return monoFlux;
}
//返回类型根据实际需求自己进行调整
protected Mono handler2(ConsumerRecord consumerRecord) {
try{
return Mono.just(true);
}catch (Exception e) {
return Mono.error(e);
}
}
@Override
public void run(String... args) {
dgkConsummer().subscribe(m -> m.subscribe());
dgkConsummer2().subscribe(m -> m.subscribe());
}



