implementation "io.quarkus:quarkus-smallrye-reactive-messaging-kafka"如果是maven项目,在pom.xml中添加依赖
在application.properties文件中添加配置属性io.quarkus quarkus-smallrye-reactive-messaging-kafka
# kafka地址 kafka.bootstrap.servers=localhost:9092 # 设置连接器名称,my-outgoing-channel为生产者自定义的channel名称,my-incoming-channel为消费者自定义的channel名称 mp.messaging.outgoing.my-outgoing-channel.connector=smallrye-kafka mp.messaging.incoming.my-incoming-channel.connector=smallrye-kafka # 设置主题,将channel连接到指定的主题,此时如果主题未创建,则创建并连接 mp.messaging.outgoing.my-outgoing-channel.topic=my-test-topic mp.messaging.incoming.my-incoming-channel.topic=my-test-topic # 设置主题的分区数量为3,my-test-topic为你自定义的主题名字 quarkus.kafka.devservices.topic-partitions.my-test-topic=3 # 压缩消息 # 非GraalVM21+,使用mp.messaging.outgoing.fruit-out.compression.type=snappy代替 quarkus.kafka.snappy.enabled=true # 序列化,key的序列化,不设置默认为string,value需要手动设置 mp.messaging.incoming.my-outgoing-channel.value.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.incoming.my-incoming-channel.value.serializer=org.apache.kafka.common.serialization.StringSerializer生产者
@Path("/test")
@ApplicationScoped
public class KafkaProducer {
@Inject
@Broadcast // 广播到当前channel的所有消费者上,等同于mp.messaging.my-outgoing-channel.broadcast=true
@Channel"my-outgoing-channel") //my-outgoing-channel与配置文件里面的my-outgoing-channel是同一个
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 256) //在注入的Emitter上配置背压策略
Emitter emitter;
//使用Emitter且配置表明my-outgoing-channel通道使用smallrye-kafka连接器
//然后Quarkus将自动将value.serializer设置为Kafka内置的StringSerializer(泛型对应的序列化)
@POST
@Produces
@Consumes(MediaType.APPLICATION_JSON)
public void send(@NotBlank String message) {
emitter.send(Message.of(price)
.withAck(() -> {
// 提交成功的处理
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// 提交失败的处理
return CompletableFuture.completedFuture(null);
}));
}
}
消费者
@ApplicationScoped
public class KafkaConsumer {
@Incoming("my-incoming-channel") // my-incoming-channel与配置文件里面的my-incoming-channel是同一个
// 可以使用@Merge注解接收多个channel的消息,等同于mp.messaging.incoming.my-incoming-channel.merge=true
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
//@Acknowledgment用于消息确认,Strategy.PRE_PROCESSING表示收到消息执行代码前自动确认
public void consume(String msg) {
System.out.println("收到消息:" + msg);
}
}
其他配置参数说明
#是否启用活动性和就绪性检查(false为禁用) mp.messaging.incoming.my-incoming-channel.health-enabled=false mp.messaging.outgoing.my-outgoing-channel.health-enabled=false mp.messaging.incoming.my-incoming-channel.health-readiness-enabled=false mp.messaging.outgoing.my-outgoing-channel.health-readiness-enabled=false #压缩消息 quarkus.kafka.snappy.enabled=true # 序列化 mp.messaging.incoming.my-incoming-channel.value.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.my-outgoing-channel.value.serializer=org.apache.kafka.common.serialization.StringSerializer #更多配置请参考文档第21项内容参考文档
https://quarkus.io/guides/kafka#quarkus-extension-for-apache-kafka



