前面写过一片文章使用rocketmq-client整合RocketMQ的,这篇文章也不讲这些理论,理论还是前往RocketMQ消息类型或者其他往期文章,本文就如标题,纯粹的操一下rocketmq-spring-boot-starter这个玩意!
依赖org.apache.rocketmq rocketmq-spring-boot-starter
这里就不能单纯使用rocketmq-client了,有很多API是rocketmq-spring-boot-starter提供的,虽然底层还是调用的rocketmq-client,下文会介绍!
通用消息体@Setter
@Getter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
public class MsgTest {
private int id;
private String context;
private Date date;
}
普通消息
同步消息
同不消息也就这些API,简单讲解一下!
//发送普通同步消息-Object syncSend(String destination, Object payload) //发送普通同步消息-Message syncSend(String destination, Message> message) //发送批量普通同步消息 syncSend(String destination, Collectionmessages) //发送普通同步消息-Object,并设置发送超时时间 syncSend(String destination, Object payload, long timeout) //发送普通同步消息-Message,并设置发送超时时间 syncSend(String destination, Message> message, long timeout) //发送批量普通同步消息,并设置发送超时时间 syncSend(String destination, Collection messages, long timeout) //发送普通同步延迟消息,并设置超时,这个下文会演示 syncSend(String destination, Message> message, long timeout, int delayLevel)
@Test
void syncSendStr() {
//syncSend和send是等价的
rocketMQTemplate.syncSend("first-topic-str", "hello world test1");
//send底层还是会调用syncSend的代码
rocketMQTemplate.send("first-topic-str", MessageBuilder.withPayload("hello world test1").build());
SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", "hello world test2");
log.info("syncSend===>{}",res);
}
@Test
void syncSendPojo() {
MsgTest msg = new MsgTest(1,"hello world test3",new Date());
SendResult res = rocketMQTemplate.syncSend("first-topic-pojo", MessageBuilder.withPayload(msg).build());
log.info("syncSend===>{}",res);
}
这里存在两种消息体,一种是Object的,另一种是Message>的形式的,其实我们发送Object的时候,底层是有帮我们做转换的,其实和我们在上层调用
MessageBuilder.withPayload("hello world test1").build()
是一样的!源码如下
异步消息
//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message> message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message> message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时,这个下文会演示
asyncSend(String destination, Message> message, SendCallback sendCallback, long timeout,
int delayLevel)
@Test
void asyncSendStr() {
rocketMQTemplate.asyncSend("first-topic-str:tag1", "hello world test2 asyncSendStr", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功:{}",sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("异步消息发送失败:{}",throwable.getMessage());
}
});
}
单向消息
这里普通单向消息就只有两个操作空间,这个不用多说了,一个是Object,另一个是Message
@Test
void sendOneWayStr() {
rocketMQTemplate.sendOneWay("first-topic-str:tag1", "hello world test2 sendOneWayStr");
log.info("单向消息已发送");
}
批量消息
@Test
void asyncSendBatch() {
Message msg = MessageBuilder.withPayload("hello world test1").build();
List msgList = Arrays.asList(msg,msg,msg,msg,msg);
SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
log.info("批量消息");
}
延迟消息
同步延迟消息
@Test
void syncSendDelayedStr() {
Message message= MessageBuilder.withPayload("syncSendDelayedStr"+new Date()).build();
SendResult res=rocketMQTemplate.syncSend("first-topic-str:tag1", message, 3000, 3);
log.info("res==>{}",res);
}
异步延迟消息
@Test
void asyncSendDelayedStr() {
//Callback
SendCallback sc=new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送异步延时消息成功");
}
@Override
public void onException(Throwable throwable) {
log.info("发送异步延时消息失败:{}",throwable.getMessage());
}
};
Message message= MessageBuilder.withPayload("asyncSendDelayedStr").build();
rocketMQTemplate.asyncSend("first-topic-str:tag1", message, sc, 3000, 3);
}
顺序消息
这里使用rocketmq-spring-boot-starter发送顺序消息就比较方便了,不像使用rocket-client那样,需要手动获取RocketMQ中当前topic的队列个数然后再通过hashCode计算对应的队列,这里底层都帮我们做好了处理!
@Test
void SendOrderStr() {
List msgList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
msgList.add(new MsgTest(100, "我是id为100的第"+(i+1)+"条消息", new Date()));
}
//msgList.add(new MsgTest(1, "我是id为1的第1条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第1条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第2条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第3条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第2条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第3条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第4条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第5条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第6条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第7条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第4条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第5条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第6条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第7条消息", new Date()));
msgList.forEach(t ->{
//rocketMQTemplate.sendoneWayOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()));
//rocketMQTemplate.syncSendOrderly("first-topic-str:tag1", t, String.valueOf(t.getId()));
rocketMQTemplate.asyncSendOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功:{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("异步消息发送失败:{}", throwable.getMessage());
}
});
});
}
使用for循环100条数据,或者使用注释掉的代码其实都是一样的,说明一下使用for循环100是确定id一致的时候,通过hashKey会被分配到同一个队列中,如下
上面代码共测试了三总类型,同步,异步,单向,但是异步好像顺序还是有问题,但是查看了数据,发现数据确实是在分派到一个队列,
要保证消息有顺序需要保证一下三要素!
- 消息被发送时保持顺序
- 消息被存储时保持和发送的顺序一致
- 消息被消费时保持和存储的顺序一致
至于这里异步顺序消息为什么最后孙旭有问题,原因应该是这里异步消息无需同步阻塞等待RocketMQ确认收到消息,所以for循环也不会阻塞等待,然后就把下一条数据发给了RocketMQ,但是数据在网络中传输是存在速度差异的,那么也就导致这100条虽然是有顺序的从我们程序里for循环发出去,没任何阻塞,所以for执行很快,几乎同时100条数据出去,然后100条数据在网络中传输各自有快慢,所以并不是有顺序的被RocketMQ接受所以也就导致消费的时候就是无序的了
事务消息 注意上述API中带了超时时间的是总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)



