1、基础整合
1.1.POM1.2.yaml配置1.3.Producer代码1.4.Consumer代码1.5.代码测试1.6.topic与tag的指定 2、六大消息类型代码实现
2.1.基本消息样例2.2.顺序消息样例2.3.延时消息样例2.4.标签过滤消息样例2.5.SQL92过滤消息样例2.6.批量消息样例2.7.回馈消息样例 3、超时设置与MessageEx4、消息发送重试机制5、消息幂等处理6、源码地址
1、基础整合在SpringBoot启动时,Producer就会向RocketMQ注册所有的topic信息,并且topic名称不能重复。
一个监听器只能监听一个topic。
1.2.yaml配置org.apache.rocketmq rocketmq-spring-boot-starter 2.2.1
# rocketmq配置 rocketmq: #rocketmq服务地址集群由`;`分开 name-server: http://162.14.119.135:9876 #自定义的组名称 producer: group: producer_test #消息发送超时时长 send-message-timeout: 50001.3.Producer代码
@Service
@Slf4j
public class TestProducer {
@Resource
rocketmqTemplate rocketmqTemplate;
public void send() {
String text = "测试发送";
Message message = MessageBuilder.withPayload(text).build();
log.info("开始发送...");
rocketMQTemplate.send("test_topic", message);
log.info("已发送...");
}
}
1.4.Consumer代码
@Component @RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_group") @Slf4j public class TestConsumer implements RocketMQListener1.5.代码测试{ @Override public void onMessage(String message) { log.info("TestConsumer - 接受到消息:" + message); } }
通过Controller接口调用Producer发送消息到队列中,Consumer通过监听器监听是否有消息,如果有则获取成功。
使用rocketmqTemplate发送消息时没有指定设置topic及tag的参数,而是由参数destination实现,调用send方法的源码如下:
public void send(D destination, Message> message) {
this.doSend(destination, message);
}
private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
if (destination != null && destination.length() >= 1) {
if (payloads != null && payloads.length >= 1) {
// 分别获取topic与tag
String[] tempArr = destination.split(":", 2);
String topic = tempArr[0];
String tags = "";
if (tempArr.length > 1) {
tags = tempArr[1];
}
}
...
}
...
}
可以发现,destination参数,可以由:分开,第一个参数为topic,第二个参数为tag,默认可以不设置tag参数,tag的设置应该根据实际使用场景来绝对。
消费者监听器通过selectorexpression ="tagName"获取指定的TAG消息
// 需要获取多个tag时,使用||分隔:"a||b||c" @rocketmqMessageListener(selectorexpression ="tagName"...)2、六大消息类型代码实现 2.1.基本消息样例
基本消息样例分为以下三重:
同步消息:这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。异步消息:用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。单向发送消息:这种方式主要用在不关心发送结果的场景,例如日志发送。
baseConsumer:
@Component @RocketMQMessageListener(selectorexpression = "", topic = "base_topic", consumerGroup = "base_group") @Slf4j public class baseConsumer implements RocketMQListener{ @Override public void onMessage(String message) { log.info("基本信息案例-接受到消息:" + message); } }
baseProducer:
@Service
@Slf4j
public class baseProducer {
@Resource
RocketMQTemplate rocketMQTemplate;
public void sync() {
String text = "基本信息案例-同步发送" + System.currentTimeMillis();
log.info(text);
rocketMQTemplate.syncSend("base_topic", text);
log.info("同步发送-已发送...");
}
public void async() {
String text = "基本信息案例-异步发送" + System.currentTimeMillis();
log.info(text);
for (int a = 1; a <= 10; a++) {
rocketMQTemplate.asyncSend("base_topic", text + ",ID:" + a, new SendCallback() {
// SendCallback接收异步返回结果的回调
// 成功发送
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送 - 发送成功");
}
// 发送失败
@Override
public void onException(Throwable throwable) {
log.info("异步发送 - 发送失败");
throwable.printStackTrace();
}
});
}
log.info("异步发送-已发送...");
}
public void oneWay() {
String text = "基本信息案例-单向发送" + System.currentTimeMillis();
log.info(text);
rocketMQTemplate.sendOneWay("base_topic", text);
log.info("单向发送-已发送...");
}
}
Controller:
@RestController
@RequestMapping("test")
public class TestController {
@Resource
private baseProducer baseProducer;
@GetMapping("/base")
public Object base() {
// 同步发送
baseProducer.sync();
// 异步发送
baseProducer.async();
// 单向发送
baseProducer.oneWay();
return "基本消息样例";
}
}
效果:
消息有序指的是可以按照消息的发送顺序来消费(FIFO),rocketmq可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号(topic)相同的消息会被先后发送到同一个队列中,消费时,同一个订单号获取到的肯定是同一个队列。
OrderConsumer:
@Component @RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_group", consumeMode = ConsumeMode.ORDERLY) @Slf4j public class OrderConsumer implements RocketMQListener{ @Override public void onMessage(String message) { log.info("顺序消息生产-接受到消息:" + message); } }
OrderProducer:
@Service
@Slf4j
public class OrderProducer {
@Resource
RocketMQTemplate rocketMQTemplate;
public void order() {
log.info("顺序消息");
try {
for (int i = 1; i <= 10; i++) {
int num = (int) (Math.random() * 10000);
// 设置一个延时,表示同一个消息先后进入到队形中
TimeUnit.MILLISECONDS.sleep(50);
log.info("顺序消息,ID:" + num);
// 第一个参数为topic,第二个参数为内容,第三个参数为Hash值,不同hash值在不同的队列中
rocketMQTemplate.syncSendOrderly("order_topic", "顺序消息,ID:" + num, "order");
}
log.info("已发送...");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Controller:
@RestController
@RequestMapping("test")
public class TestController {
@Resource
private OrderProducer orderProducer;
@GetMapping("/order")
public Object order() {
orderProducer.order();
return "发送顺序消息";
}
}
2.3.延时消息样例
通过设置延时等级,实现消费者延时消费数据,比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
RocketMQ并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列。
// 固定延时,设置参数时,对应数值为:1~18 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
延时原理:
Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;
若有则需要经历一个复杂的过程:修改消息、投递延时消息、将消息重新写入commitlog
修改消息
修改消息的Topic为SCHEDULE_TOPIC_XXXX
根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件。
延迟等级delayLevel与queueId的对应关系为:queueId = delayLevel -1
修改消息索引单元内容。将MessageTagHashCode中原本存放消息的Tag的Hash值,现修改为消息的投递时间。
投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。
投递时间 = 消息存储时间 + 延时等级时间。
消息存储时间指的是消息被发送到Broker时的时间戳。
将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
投递延时消息
Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的==投递时间,将延时消息投递到⽬标Topic中。==不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息,然后再次将消息投递到目标Topic中。
消息重新写入commitlog
延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。
这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类ScheuleMessageService。
ScheduleConsumer:
@Component @RocketMQMessageListener(topic = "scheduled_topic", consumerGroup = "scheduled_group") @Slf4j public class ScheduleConsumer implements RocketMQListener{ @Override public void onMessage(String message) { log.info("延时消息-接受到消息:" + message); } }
ScheduledProducer:
@Service
@Slf4j
public class ScheduledProducer {
@Resource
RocketMQTemplate rocketMQTemplate;
public void scheduled() {
String text = "延时消息"+ System.currentTimeMillis();
log.info(text);
// 设置延时等级2,这个消息将在5s之后发送
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Message message = MessageBuilder.withPayload(text).build();
rocketMQTemplate.syncSend("scheduled_topic", message, 1000, 2);
log.info("已发送...");
}
}
Controller:
@RestController
@RequestMapping("test")
public class TestController {
@Resource
private ScheduledProducer scheduledProducer;
@GetMapping("/scheduled")
public Object scheduled() {
scheduledProducer.scheduled();
return "发送延时消息";
}
}
效果:
一个应用尽可能用一个Topic,消息子类型用tag来标识,tag可以由应用自由设置。 在使用rocketmqTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tag名称。
TagConsumer:
@Component @RocketMQMessageListener(selectorexpression = "TAG-A||TAG-B", topic = "tag_topic", consumerGroup = "tag_group") @Slf4j public class TagConsumer implements RocketMQListener{ @Override public void onMessage(String message) { log.info("标签过滤消息-接受到消息:" + message); } }
TagProducer:
@Service
@Slf4j
public class TagProducer {
@Resource
RocketMQTemplate rocketMQTemplate;
public void tag() {
String text = "标签过滤消息," + System.currentTimeMillis();
log.info(text);
// 任何类型的send方法均可以指定TAG,默认可以不指定则为*
Message message = MessageBuilder.withPayload(text).build();
rocketMQTemplate.syncSend("tag_topic:TAG-A", message);
log.info("已发送...");
}
}
Controller:
@RestController
@RequestMapping("test")
public class TestController {
@Resource
private TagProducer tagProducer;
@GetMapping("/tag")
public Object tag() {
// TAG过滤
tagProducer.tag();
return "指定标签消息";
}
}
2.5.SQL92过滤消息样例
TAG过滤消息只能有一个标签,这对于复杂的场景可能不起作用,通过SQL表达式筛选消息可以实现复杂的情况。
默认情况下,rocketmq不支持SQL92过滤形式,需要对broker进行配置
# 配置 vim /opt/rocketmq/rocketmq-4.9.2/conf/broker.conf # 在配置文件最后加上,并且需要重启 enablePropertyFilter=true # 进入目录,进行重启操作 cd /opt/rocketmq/rocketmq-4.9.2 # 关闭 sh bin/mqshutdown broker # 启动Broker,-n 指向NameServer地址,多个由`;`分开,也可以在配置文件中设置 nohup sh bin/mqbroker -n 162.14.119.135:9876 -c /opt/rocketmq/rocketmq-4.9.2/conf/broker.conf autoCreateTopicEnable=true &>/opt/rocketmq/rocketmq-4.9.2/logs/broker.log 2>&1 &
rocketmq执行一次SQL的基础语法:
语法特性:
数值比较,比如:>,>=,<,<=,BETWEEN,=;字符比较,比如:=,<>,IN;IS NULL 或者 IS NOT NULL;逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:123,4.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
使用:
通过putUserProperty(key,value)指定一个参数的值(putUserProperty可以存在多个)
Consumer通过SQL语法来筛选是否满足设置的参数条件,如果满足则消费消息
SQLConsumer:
@Component @RocketMQMessageListener(selectorType = SelectorType.SQL92, selectorexpression = "a between 0 and 6 or b > 8", topic = "sql_topic", consumerGroup = "sql_group") @Slf4j public class SQLConsumer implements RocketMQListener{ @Override public void onMessage(String message) { log.info("SQL92过滤消息-接受到消息:" + message); } }
SQLProducer:
@Service
@Slf4j
public class SQLProducer {
@Resource
RocketMQTemplate rocketMQTemplate;
public void selector() {
String text = "SQL92过滤消息" + System.currentTimeMillis();
log.info(text);
Message message = MessageBuilder.withPayload(text).build();
// 设置参数
Map map = new HashMap<>(4);
map.put("a", 2);
map.put("b", 10);
rocketMQTemplate.convertAndSend("sql_topic", message, map);
log.info("已发送...");
}
}
Controller:
@RestController
@RequestMapping("test")
public class TestController {
@Resource
private SQLProducer SQLProducer;
@GetMapping("/selector")
public Object selector() {
// SQL92过滤
SQLProducer.selector();
return "过滤消息样例";
}
}
2.6.批量消息样例
生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。
发送限制:
批量发送的消息必须具有相同的Topic。批量发送的消息必须具有相同的刷盘策略。批量发送的消息不能是延时消息与事务消息。消息的总大小不应超过4MB。
发送大小:
默认情况下,一批发送的消息总大小不能超过4MB字节,如果想超出该值,有两种解决方案:
方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送,一般不大于1M。方案二:在Producer端与Broker端修改属性。
Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性
BatchConsumer:
@Component @RocketMQMessageListener(topic = "batch_topic", consumerGroup = "batch_group") @Slf4j public class BatchConsumer implements RocketMQListener{ @Override public void onMessage(String message) { log.info("批量消息-接受到消息:" + message); } }
MessageSplitter:
import org.springframework.messaging.Message; import java.util.Iterator; import java.util.List; public class MessageSplitter implements Iterator> { private final int sizeLimit = 1024 * 1024; ; private final List
messages; private int currIndex; public MessageSplitter(List messages) { this.messages = messages; // 保证单条数据的大小不大于sizeLimit messages.forEach(m -> { if (m.toString().length() > sizeLimit) { throw new RuntimeException("单挑消息不能大于" + sizeLimit + "B"); } }); } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message t = messages.get(nextIndex); totalSize = totalSize + t.toString().length(); if (totalSize > sizeLimit) { break; } } List subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } }
BatchProducer:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@Service
@Slf4j
public class BatchProducer {
@Resource
RocketMQTemplate rocketMQTemplate;
public void batch() {
String text = "批量消息";
log.info(text);
List messageList = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
messageList.add(MessageBuilder.withPayload(text + "--" + i).build());
}
log.info("开始发送...");
//把大的消息分裂成若干个小的消息
MessageSplitter splitter = new MessageSplitter(messageList);
while (splitter.hasNext()) {
List nextList = splitter.next();
SendResult result = rocketMQTemplate.syncSend("batch_topic", nextList);
if (result.getSendStatus() == SendStatus.SEND_OK) {
log.info("发送批量消息成功!消息ID为:{}", result.getMsgId());
} else {
log.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus());
}
}
log.info("已发送...");
}
}
Controller:
@RestController
@RequestMapping("test")
public class TestController {
@Resource
private BatchProducer batchProducer;
@GetMapping("/batch")
public Object batch() {
// 批量消息样例
batchProducer.batch();
return "批量消息样例";
}
}
2.7.回馈消息样例
生产者通过sendAndReceive发送消息,消费者需要实现rocketmqReplyListener
如果连续通过sendAndReceive发送消息,生产者必须收到消费者的回复才能发送下一条消息。
ReplyConsumer:
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "reply_topic", consumerGroup = "reply_group") @Slf4j public class ReplyConsumer implements RocketMQReplyListener{ @Override public byte[] onMessage(String message) { log.info("接受到消息:" + message); // 返回消息到生成者 return "返回消息到生产者".getBytes(); } }
ReplyProducer:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@Slf4j
public class ReplyProducer {
@Resource
RocketMQTemplate rocketMQTemplate;
public void reply() {
// 如果消费者没有回馈消息,则不会发送下一条消息
for (int i = 1; i <= 10; i++) {
String text = "回馈消息" + "--" + i;
log.info("发送" + text);
Object obj = rocketMQTemplate.sendAndReceive("reply_topic", text, String.class);
log.info("消费者返回的消息:" + obj);
}
}
}
Controller:
@RestController
@RequestMapping("test")
public class TestController {
@Resource
private ReplyProducer replyProducer;
@GetMapping("/reply")
public Object reply() {
// 消息事务
replyProducer.reply();
return "回馈消息样例";
}
}
效果:
消费者监听器需要实现rocketmqListener 接口,通过传入的泛型为String,则可以直接获取生产者传递的消息,如果想要获取消息的额外详情信息,需要传入泛型MessageEx
@Component @rocketmqMessageListener(topic = "topicName", consumerGroup = "groupName") public class ExConsumer implements rocketmqListener{ @Override public void onMessage(MessageExt message) { } }
发送消息时可以设置超时时间,通过配置rocketmq.producer.send-message-timeout =实现全局消息超时设置,也可以对每个发送的消息配置单独的超时时间,比如:
rocketmqTemplate.syncSend("topic", message, 1000);
2、在SpringBoot整合中,一个监听器只能监听一个topic,并且topic不能重复。
4、消息发送重试机制Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。
对于消息重投,需要注意以下几点:
生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的。只有普通消息具有发送重试机制,顺序消息是没有的。消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在rocketmq中是无法避免的问题。消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略 5、消息幂等处理
由于某种原因(比如:网络),导致MQ不知道消息已经被消费,再次将该消息分发给其他的消费者。(因为消息重试等机制的原因,如果一个consumer断了,rocketmq有consumer集群,会将该消息重新发给其他consumer)。
因此为了防止重复消费,需要进行幂等处理,方案如下:
6、源码地址1、 将已经处理的消息存入数据库中,每次处理前先进行查询操作,判断当前消息是否成功处理。
2、 将已经处理的消息存入redis库中,每次处理前先进行查询操作,判断当前消息是否成功处理。
源码地址:https://gitee.com/lhzlx/spring-boot-rocket-mq-demo.git



