栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQ顺序消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ顺序消息

        RocketMQ顺序消息是指消息发送顺序与消费顺序一致,包括全局顺序消息和局部顺序消息,全局顺序消息可以使用一个Topic对应1个队列来实现,但是吞吐量特别低;局部顺序消息,是指在一个队列上消息有序的发送和消费,但是对应异步发送和广播消费模式不支持顺序。局部顺序消息,在发送消息时使用MessageQueueSelector让特定的消息到达特定的一个队列上,在消费消息时,使用MessageConsumerOrderly来消费消息,它可以保证有序消费。

让特定的消息到达指定的队列(消息在单个队列上有序)

 @GetMapping("sendOrderly")
    public Object callback() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        List orderList = ProductOrder.getOrderList();
        for (int i = 0; i < orderList.size(); i++) {
            ProductOrder productOrder = orderList.get(i);
            Message message = new Message(JmsConfig.ORDERLY_TOPIC, "", productOrder.getOrderId()+"", productOrder.toString().getBytes());
            SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List mqs, Message msg, Object arg) {
                    Long orderId = (Long) arg;
                    long index = orderId % mqs.size();
                    return mqs.get((int) index);
                }
            }, productOrder.getOrderId());
            log.info("发送结果:{}|order:{}",sendResult,productOrder);
        }
        return new HashMap<>();
    }

使用ConsumerListenerOrderly顺序消费

package com.tech.rocketmq.jms;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;


@Slf4j
@Component
public class OrderlyConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup="pay_orderly_consumer_group";

    public OrderlyConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        
        consumer.subscribe(JmsConfig.ORDERLY_TOPIC,"*");
        //顺序消费
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                MessageExt messageExt = msgs.get(0);
                try{
                    byte[] body = messageExt.getBody();
                    log.info("Receive New Messages:{}",new java.lang.String(body));
                    //做业务逻辑操作
                    return ConsumeOrderlyStatus.SUCCESS;
                }catch (Exception e){
                    e.printStackTrace();
                    //暂停一会再获取
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start...");
    }
}

可以看到发送的三种消息ID,分别到达了三个队列上,如果启动三个消费者,会发现3个消费者被各自分配了1个队列上消费,在单个队列上消息是有序的。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/572311.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号