栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot使用rocketmq-spring-boot-starter整合RocketMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot使用rocketmq-spring-boot-starter整合RocketMQ

前言

前面写过一片文章使用rocketmq-client整合RocketMQ的,这篇文章也不讲这些理论,理论还是前往RocketMQ消息类型或者其他往期文章,本文就如标题,纯粹的操一下rocketmq-spring-boot-starter这个玩意!

依赖
		
            org.apache.rocketmq
            rocketmq-spring-boot-starter
        

这里就不能单纯使用rocketmq-client了,有很多API是rocketmq-spring-boot-starter提供的,虽然底层还是调用的rocketmq-client,下文会介绍!

通用消息体
@Setter
@Getter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
public class MsgTest {

    private int id;

    private String context;

    private Date date;

}
普通消息

同步消息

同不消息也就这些API,简单讲解一下!

//发送普通同步消息-Object
syncSend(String destination, Object payload)
//发送普通同步消息-Message
syncSend(String destination, Message message)
//发送批量普通同步消息
syncSend(String destination, Collection messages)
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout)
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message message, long timeout)
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection messages, long timeout)
//发送普通同步延迟消息,并设置超时,这个下文会演示
syncSend(String destination, Message message, long timeout, int delayLevel)
	
	@Test
    void syncSendStr() {
    	//syncSend和send是等价的
        rocketMQTemplate.syncSend("first-topic-str", "hello world test1");
        //send底层还是会调用syncSend的代码
        rocketMQTemplate.send("first-topic-str", MessageBuilder.withPayload("hello world test1").build());
        SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", "hello world test2");
        log.info("syncSend===>{}",res);
    }


	
    @Test
    void syncSendPojo() {
        MsgTest msg = new MsgTest(1,"hello world test3",new Date());
        SendResult res = rocketMQTemplate.syncSend("first-topic-pojo", MessageBuilder.withPayload(msg).build());
        log.info("syncSend===>{}",res);
    }

这里存在两种消息体,一种是Object的,另一种是Message的形式的,其实我们发送Object的时候,底层是有帮我们做转换的,其实和我们在上层调用

MessageBuilder.withPayload("hello world test1").build()

是一样的!源码如下

异步消息

//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时,这个下文会演示
asyncSend(String destination, Message message, SendCallback sendCallback, long timeout,
        int delayLevel) 
	
     @Test
    void asyncSendStr() {
        rocketMQTemplate.asyncSend("first-topic-str:tag1", "hello world test2 asyncSendStr", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步消息发送成功:{}",sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                log.info("异步消息发送失败:{}",throwable.getMessage());
            }
        });
    }

单向消息

这里普通单向消息就只有两个操作空间,这个不用多说了,一个是Object,另一个是Message

	
    @Test
    void sendOneWayStr() {
        rocketMQTemplate.sendOneWay("first-topic-str:tag1", "hello world test2 sendOneWayStr");
        log.info("单向消息已发送");
    }
批量消息
	
    @Test
    void asyncSendBatch() {
        Message msg = MessageBuilder.withPayload("hello world test1").build();
        List msgList = Arrays.asList(msg,msg,msg,msg,msg);
        SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
        log.info("批量消息");
    }
延迟消息

同步延迟消息

	
    @Test
    void syncSendDelayedStr() {
        Message message= MessageBuilder.withPayload("syncSendDelayedStr"+new Date()).build();
        
        SendResult res=rocketMQTemplate.syncSend("first-topic-str:tag1", message, 3000, 3);
        log.info("res==>{}",res);
    }

异步延迟消息

	 
    @Test
    void asyncSendDelayedStr() {
        //Callback
        SendCallback sc=new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送异步延时消息成功");
            }
            @Override
            public void onException(Throwable throwable) {
                log.info("发送异步延时消息失败:{}",throwable.getMessage());
            }
        };

        Message message= MessageBuilder.withPayload("asyncSendDelayedStr").build();
        rocketMQTemplate.asyncSend("first-topic-str:tag1", message, sc, 3000, 3);
    }
顺序消息

这里使用rocketmq-spring-boot-starter发送顺序消息就比较方便了,不像使用rocket-client那样,需要手动获取RocketMQ中当前topic的队列个数然后再通过hashCode计算对应的队列,这里底层都帮我们做好了处理!

	
    @Test
    void SendOrderStr() {
        List msgList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            msgList.add(new MsgTest(100, "我是id为100的第"+(i+1)+"条消息", new Date()));
        }
        //msgList.add(new MsgTest(1, "我是id为1的第1条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第1条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第2条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第3条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第2条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第3条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第4条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第5条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第6条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第7条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第4条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第5条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第6条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第7条消息", new Date()));
        msgList.forEach(t ->{
            //rocketMQTemplate.sendoneWayOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()));
            //rocketMQTemplate.syncSendOrderly("first-topic-str:tag1", t, String.valueOf(t.getId()));
            rocketMQTemplate.asyncSendOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()), new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("异步消息发送成功:{}", sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    log.info("异步消息发送失败:{}", throwable.getMessage());
                }
            });

        });
    }

使用for循环100条数据,或者使用注释掉的代码其实都是一样的,说明一下使用for循环100是确定id一致的时候,通过hashKey会被分配到同一个队列中,如下

上面代码共测试了三总类型,同步,异步,单向,但是异步好像顺序还是有问题,但是查看了数据,发现数据确实是在分派到一个队列,


要保证消息有顺序需要保证一下三要素!

  • 消息被发送时保持顺序
  • 消息被存储时保持和发送的顺序一致
  • 消息被消费时保持和存储的顺序一致

至于这里异步顺序消息为什么最后孙旭有问题,原因应该是这里异步消息无需同步阻塞等待RocketMQ确认收到消息,所以for循环也不会阻塞等待,然后就把下一条数据发给了RocketMQ,但是数据在网络中传输是存在速度差异的,那么也就导致这100条虽然是有顺序的从我们程序里for循环发出去,没任何阻塞,所以for执行很快,几乎同时100条数据出去,然后100条数据在网络中传输各自有快慢,所以并不是有顺序的被RocketMQ接受所以也就导致消费的时候就是无序的了

事务消息 注意

上述API中带了超时时间的是总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/666513.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号