栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQ保证消息有序之自定义消息队列选择器

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ保证消息有序之自定义消息队列选择器

说明

这里使用的是自定义消息队列选择器保证消息的有序, 其实可以使用SelectMessageQueueByHash 来保证消息的有序,

使用SelectMessageQueueByHash 保证顺序有序的帖子看: https://zjj1994.blog.csdn.net/article/details/120883178

怎么保证全局有序?

保证全局有序有个最简单的方式,就是topic里面只有一个队列,这样就可以保证全局有序,但是有人会这样用么? 肯定不会的,因为这样用的话,性能吞吐量安全性都会非常的差.

所以都是保证局部有序,而不是全局有序.

保证顺序有序使用场景

订单支付,一个订单下来必须是有顺序的,比如说 必须先支付 然后再营销 ,然后进入 物流发订单 , 这样顺序是不能乱的.

或者聊天功能: 我们所有人发的顺序需要保证有序,不能说是你先发的,然后再别人后面才到.

所以这就是要保证局部有序.

案例

生产者发送10个订单,每个订单里面有六个步骤,每个步骤都会发送一个消息过去, 这六个步骤要求得是顺序性的.

consumer要保证消费的顺序是一样的.

所有的MQ只能保证在一个queue里面消息是有序的, 如果是Kafka的话就是Partition.

RocketMQ保证的是局部有序,而不是全局有序

什么是顺序局部有序? 顺序局部有序需要你的生产者和消费者一起配合才能做到.

消费者

注意: 注册Listener的时候 ,registerMessageListener 需要使用 MessageListenerOrderly ,这种取出来的消息是有序的. 如果用的是别的MessageListener,比如说MessageListenerConcurrently就不能保证有序了.

package org.apache.rocketmq.example.ordermessage;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe("OrderTopicTest", "*");
        
        consumer.registerMessageListener(new MessageListenerOrderly() {
            
            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息内容 " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS; // 返回成功消费标识
            }
        });

        // MessageListenerConcurrently 是乱拿的,保证不了消费顺序

//        这样是保证不了最终消费顺序的。
//        consumer.registerMessageListener(new MessageListenerConcurrently() {
//            @Override
//            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
//                for(MessageExt msg:msgs){
//                    System.out.println("收到消息内容 "+new String(msg.getBody()));
//                }
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            }
//        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}

自定义消息队列选择器

里面的注解我写的很详细了, 就不多做解释了,

另外需要注意一个坑,就是String字符串在hashCode的时候可能会出现负数,这个需要注意.

为什么会string hashCode 完了之后会出现负数? 看这个博客 :https://zjj1994.blog.csdn.net/article/details/120875055

package org.apache.rocketmq.example.ordermessage;

import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;


public class MyMessageQueueSelector implements MessageQueueSelector {

    
    @Override
    public MessageQueue select(List mqs, Message msg, Object arg) {


        // orderId取模MessageQueue的Size,获取到一个索引,这样就保证了同一个Order里面的消息存到了同一个队列

        String orderId = (String) arg; // 获取传过来的orderId

        // 进行hash操作
        //下面 & Integer.MAX_VALUE 的目的是复制hashCode出来的是负数
        int orderIdHashCode = orderId.hashCode() & Integer.MAX_VALUE;

        // hashCode值和队列的长度进行取余数,取出来一个整数
        int index = orderIdHashCode % mqs.size();

        // 通过上面取余数获取一个队列, 往这个队列里面投递消息,这样就能保证
        MessageQueue messageQueue = mqs.get(index);
        return messageQueue;
    }
}

生产者

生成500个订单, 每个订单都有4个步骤 ,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付的消息 在 下单消息 之前过来, 这样就出现业务bug了.

package org.apache.rocketmq.example.ordermessage;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.UUID;

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
            producer.start();

            for (int i = 0; i < 500; i++) { // 生成500个订单

                //生成50个订单,这里订单号就用uuid来代替了, 实际情况下每个公司的订单id生成方案是不一样的
                String orderId = UUID.randomUUID().toString();
                // 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来
                for (int j = 1; j <= 4; j++) {


                    // 实例化一个消息
                    Message msg =
                            new Message("OrderTopicTest", "orderTag", "KEY" + orderId,
                                    ("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));

                    //实例化 自己编写的消息队列选择器
                    MyMessageQueueSelector myMessageQueueSelector = new MyMessageQueueSelector();

                    
                    SendResult sendResult = producer.send(msg, myMessageQueueSelector, orderId);

                    System.out.printf("%s%n", sendResult);
                }
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果 启动两个消费者

先启动两个消费者, 勾选允许多个实例,这样一个消费者就能启动两个了

一套代码启动了两个实例出来

启动生产者生产消息

这里不演示了,自行启动

查看消费者控制台

随便查看一个Consumer的控制台发现

随便看几个,发现步骤顺序都是有序的.

特殊情况!部分有序


8cf0133a-9e8d-44b6-96d9-4f30286f761a 订单中间还夹杂着一个 a2c588f8-1ec3-4de1-997b-545c43b15a33 步骤:1的消息 ,但是 8cf0133a-9e8d-44b6-96d9-4f30286f761a 步骤顺序还是有序的

虽然说 同一个订单id 的消息没有挨在一起消费, 但是 他们的顺序还是有序的. 这就是部分有序.

a2c588f8-1ec3-4de1-997b-545c43b15a33 订单也是有序的

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/340570.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号