栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RoctetMQ 如何保证顺序消费

RoctetMQ 如何保证顺序消费

文章目录

1:背景2:发送端3:消费端

1:背景

在一些特殊的场景下,需要保证消息的顺序性,rocketMQ 提供了确实有效的解决方法。需要发送端和消费端同时保证才行。
总结下来,需要满足下面几个条件:
1:同一个Topic
2:同一个Queue
3:发消息的时候用同一个线程发送消息
3:消费消息的时候用同一个线程消费一个 Queue 里面的消息,或者用提供的消息选择器 MessageListenerOrderly

2:发送端

示例代码:

    
    private static void orderSend(DefaultMQProducer producer) throws Exception {
        Message message = new Message("myTopic", "我发送的第一条消息".getBytes());
        // 消息 queue 选择器,向 topic 中的那个 queue 去写消息,返回选择好的 queue
        MessageQueueSelector selector = new MessageQueueSelector() {
            @Override
            // List mqs : topic 下的 queue 列表
            // Message msg : 具体要发的那条消息
            // Object arg :外面的 arg 参数,会被传递进来,自定义参数
            public MessageQueue select(List mqs, Message msg, Object arg) {
                //假设选择第一个 queue
                return mqs.get((Integer) arg);
            }
        };
        producer.send(message, selector, 1, 2000);

        // 或者使用提供的默认选择器实现
        //producer.send(message,new SelectMessageQueueByHash(),1);
        //producer.send(message,new SelectMessageQueueByRandom(),1);
        // 根据机房去路由选择:未实现,需要根据源码自己去实现
        //producer.send(message,new SelectMessageQueueByMachineRoom(),1);
    }

下面解释为什么要同一个 Topic,同一个 Queue,发消息的时候用同一个线程发送消息
原因:
rocketMQ 的物理结构上是没有 Topic 这个概念的,是以 Queue 为物理单位的,在逻辑上,一个 Topic 包含 4 个 Queue,Queue 队列大家都知道,FIFO,先进先出的队列。 因此在 同一个 Topic 下的 同一个 Queue 里是可以保证 消息发送的顺序性的。至于为什么用同一个线程来发送消息,是为了防止多线程环境下带来的不确定性。

3:消费端

示例代码:

    
    private static void oneCost(DefaultMQPushConsumer consumer){
        // 设置开启消费线程数,最大和最小
        //consumer.setConsumeThreadMax();
        //consumer.setConsumeThreadMin();
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                // 消费消息
                consumerMessage(msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
    }

MessageListenerConcurrently 是并发的开启多个线程为同一个 Queue 消费消息,MessageListenerOrderly 只为每一个 Queue 开启一个线程。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735147.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号