栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RocketMQ使用广播消息

RocketMQ使用广播消息

目录
  • 说明
  • 生产端
  • 消费端
  • 总结

说明

RocketMQ消息模式主要有两种:
(1)、MessageModel.CLUSTERING:集群模式。同一消费者组内的每个消费者,只消费到Topic的一部分消息,所有消费者消费的消息加起来就是Topic的所有消息。
(2)、MessageModel.BROADCASTING:广播模式。同一消费者组内的每个消费者,都消费到Topic的所有消息。如Topic有100条消息,则同个消费者组下的所有消费者都能消费到100条消息。

消息广播,主要配置在于消费者通过配置消息模式MessageModel)为MessageModel.BROADCASTING实现。

生产端
@Test
public void sendMessage() throws Exception {
    DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
    Message message = new Message(RocketMqUtil.TOPIC, "broadcasting",
            "broadcasting-message".getBytes(Charset.forName("UTF-8")));
    //按正常操作发送消息
    SendResult sendResult = defaultMQProducer.send(message);
    log.info("发送消息结果:{}", sendResult.getSendStatus().name());
}

生产端按正常发送逻辑发送消息即可。

消费端
@Test
public void consumer() throws Exception {
    DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
    //设置消费消息为广播模式
    defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
    defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
            msgs.stream().map(MessageExt::getBody).map(String::new).forEach(body -> log.info("消息内容:{}", body));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    defaultMQPushConsumer.start();
    Thread.sleep(5000L);
    defaultMQPushConsumer.shutdown();
}

消费端,需要设置Consumer对象消费消息的消息模式为MessageModel.BROADCASTING。

总结

消息广播,主要在消费端通过对Consumer对象的消息模式(MessageModel)属性设置为MessageModel.BROADCASTING。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629948.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号