栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Quarkus使用Kafka发送及接收消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Quarkus使用Kafka发送及接收消息

Quarkus使用Kafka发送及接收消息 如果是gradle项目,在build.gradle中添加依赖
implementation "io.quarkus:quarkus-smallrye-reactive-messaging-kafka"
如果是maven项目,在pom.xml中添加依赖

  io.quarkus
  quarkus-smallrye-reactive-messaging-kafka

在application.properties文件中添加配置属性
# kafka地址
kafka.bootstrap.servers=localhost:9092

# 设置连接器名称,my-outgoing-channel为生产者自定义的channel名称,my-incoming-channel为消费者自定义的channel名称
mp.messaging.outgoing.my-outgoing-channel.connector=smallrye-kafka
mp.messaging.incoming.my-incoming-channel.connector=smallrye-kafka

# 设置主题,将channel连接到指定的主题,此时如果主题未创建,则创建并连接
mp.messaging.outgoing.my-outgoing-channel.topic=my-test-topic
mp.messaging.incoming.my-incoming-channel.topic=my-test-topic

# 设置主题的分区数量为3,my-test-topic为你自定义的主题名字
quarkus.kafka.devservices.topic-partitions.my-test-topic=3

# 压缩消息
# 非GraalVM21+,使用mp.messaging.outgoing.fruit-out.compression.type=snappy代替
quarkus.kafka.snappy.enabled=true

# 序列化,key的序列化,不设置默认为string,value需要手动设置
mp.messaging.incoming.my-outgoing-channel.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.my-incoming-channel.value.serializer=org.apache.kafka.common.serialization.StringSerializer
生产者
@Path("/test")
@ApplicationScoped
public class KafkaProducer {
    
    
    @Inject
    @Broadcast // 广播到当前channel的所有消费者上,等同于mp.messaging.my-outgoing-channel.broadcast=true
    @Channel"my-outgoing-channel") //my-outgoing-channel与配置文件里面的my-outgoing-channel是同一个
    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 256) //在注入的Emitter上配置背压策略
    Emitter emitter;
    //使用Emitter且配置表明my-outgoing-channel通道使用smallrye-kafka连接器
    //然后Quarkus将自动将value.serializer设置为Kafka内置的StringSerializer(泛型对应的序列化)
    
    @POST
    @Produces
    @Consumes(MediaType.APPLICATION_JSON)
    public void send(@NotBlank String message) {
        emitter.send(Message.of(price)
            .withAck(() -> {
                // 提交成功的处理
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // 提交失败的处理
                return CompletableFuture.completedFuture(null);
            }));
    }
    
}
消费者
@ApplicationScoped
public class KafkaConsumer {

    @Incoming("my-incoming-channel") // my-incoming-channel与配置文件里面的my-incoming-channel是同一个
    // 可以使用@Merge注解接收多个channel的消息,等同于mp.messaging.incoming.my-incoming-channel.merge=true
    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    //@Acknowledgment用于消息确认,Strategy.PRE_PROCESSING表示收到消息执行代码前自动确认
    public void consume(String msg) {
        System.out.println("收到消息:" + msg);
    }

}
其他配置参数说明
#是否启用活动性和就绪性检查(false为禁用)
mp.messaging.incoming.my-incoming-channel.health-enabled=false
mp.messaging.outgoing.my-outgoing-channel.health-enabled=false
mp.messaging.incoming.my-incoming-channel.health-readiness-enabled=false
mp.messaging.outgoing.my-outgoing-channel.health-readiness-enabled=false

#压缩消息
quarkus.kafka.snappy.enabled=true

# 序列化
mp.messaging.incoming.my-incoming-channel.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.my-outgoing-channel.value.serializer=org.apache.kafka.common.serialization.StringSerializer

#更多配置请参考文档第21项内容
参考文档

https://quarkus.io/guides/kafka#quarkus-extension-for-apache-kafka

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/827609.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号