RocketMQ顺序消息是指消息发送顺序与消费顺序一致,包括全局顺序消息和局部顺序消息,全局顺序消息可以使用一个Topic对应1个队列来实现,但是吞吐量特别低;局部顺序消息,是指在一个队列上消息有序的发送和消费,但是对应异步发送和广播消费模式不支持顺序。局部顺序消息,在发送消息时使用MessageQueueSelector让特定的消息到达特定的一个队列上,在消费消息时,使用MessageConsumerOrderly来消费消息,它可以保证有序消费。
让特定的消息到达指定的队列(消息在单个队列上有序)
@GetMapping("sendOrderly")
public Object callback() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
List orderList = ProductOrder.getOrderList();
for (int i = 0; i < orderList.size(); i++) {
ProductOrder productOrder = orderList.get(i);
Message message = new Message(JmsConfig.ORDERLY_TOPIC, "", productOrder.getOrderId()+"", productOrder.toString().getBytes());
SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, productOrder.getOrderId());
log.info("发送结果:{}|order:{}",sendResult,productOrder);
}
return new HashMap<>();
}
使用ConsumerListenerOrderly顺序消费
package com.tech.rocketmq.jms;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class OrderlyConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup="pay_orderly_consumer_group";
public OrderlyConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(JmsConfig.ORDERLY_TOPIC,"*");
//顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
MessageExt messageExt = msgs.get(0);
try{
byte[] body = messageExt.getBody();
log.info("Receive New Messages:{}",new java.lang.String(body));
//做业务逻辑操作
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e){
e.printStackTrace();
//暂停一会再获取
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start();
System.out.println("consumer start...");
}
}
可以看到发送的三种消息ID,分别到达了三个队列上,如果启动三个消费者,会发现3个消费者被各自分配了1个队列上消费,在单个队列上消息是有序的。



