栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【rocketmq】spring整合rocketmq入门

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【rocketmq】spring整合rocketmq入门

项目结构

父工程pom文件依赖
 
        
            
                org.springframework.boot
                spring-boot-dependencies
                2.6.4
                pom
                import
            
        
    

    
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-test
        
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.2.2
        
        
            org.projectlombok
            lombok
        
    
 普通消费者-生产者模式
生产者同步发送生产者异步发送生产者单向发送
要等到有返回消息才能继续发送不必等到返回消息,会另开一条线程等返回消息不会接受返回消息
  • 同步发送消息

1、生产者

@Component
public class ProducerSimple {
    @Autowired
    RocketMQTemplate rocketMQTemplate;
    
    public void sendSyncMsg(String topic,String message){
        rocketMQTemplate.syncSend(topic,message);
    }
}

1.1测试生产者

@SpringBootTest
class ProducerSimpleTest {
    @Autowired
    ProducerSimple producerSimple;

    @Test
    public void testSendSyncMsg(){
        producerSimple.sendSyncMsg("my-topic","第一条测试测试");
    }
}

2、消费者

@Component
@RocketMQMessageListener(topic = "my-topic",consumerGroup = "consumer-group")
public class ConsumerSimple implements RocketMQListener {
    
    public void onMessage(String s) {
        //接受到消息,业务逻辑
        System.out.println("消费者监听消息:"+s);
    }
}

2.1测试

运行spring入口文件即可 

  • 异步发送消息

1、生产者

    public void sendAsyncMsg(String topic,String message){
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            //发送成功
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功:"+sendResult);
            }
            //发送失败
            public void onException(Throwable throwable) {
                System.out.println("发送失败:"+throwable);
            }
        });
    }

1.1生产者测试

    @Test
    public void testSendAsyncMsg(){
        producerSimple.sendAsyncMsg("my-topic","第一条测试测试");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

2、消费者

同上消费者

  • 单向发送

生产者

    public void sendOneMsg(String topic,String msg){
        this.rocketMQTemplate.sendOneWay(topic,msg);
    }
  • 自定义消息体

1、实体类

@Data
@NoArgsConstructor
@ToString
public class OrderExt implements Serializable {
    private String id;
    private Date createTime;
    private Long money;
    private String title;
}

2、生产者

    public void sendMyByJson(String topic, OrderExt orderExt){
        //同步方法
        rocketMQTemplate.convertAndSend(topic,orderExt);
    }

3、消费者

@Component
@RocketMQMessageListener(topic = "my-topic-obj",consumerGroup = "consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener {
    public void onMessage(OrderExt orderExt) {
        System.out.println(orderExt);
    }
}
延迟发送消息

延迟发送时间有18个等级

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h

生产者

同步发送延迟消息:message是spring的

异步发送延迟消息:message是rocketmq的

    
    public void sendDelayMsg(String topic,OrderExt orderExt){
        //构建消息体
        Message message = MessageBuilder.withPayload(orderExt).build();
        rocketMQTemplate.syncSend(topic,message,10000,3);
        System.out.println("send message:"+orderExt);
    }

    
    @SneakyThrows
    public void sendAsyncDelayMsg(String topic, OrderExt orderExt){
        String orderStr = JSON.toJSONString(orderExt);
        //构建消息体
        org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topic, orderStr.getBytes("UTF-8"));
        this.rocketMQTemplate.getProducer().send(message, new SendCallback() {
            public void onSuccess(SendResult sendResult) {
                System.out.println("成功:"+sendResult);
            }
            public void onException(Throwable throwable) {
                System.out.println("失败:"+throwable);
            }
        },10000);
        System.out.println("send message:"+message);
    }
消息重试

消息重试也是遵循18个等级去重试

MessageExt是来自import org.apache.rocketmq.common.message.MessageExt;

@Component
@RocketMQMessageListener(topic = "my-topic-obj",consumerGroup = "consumer-group-obj")
public class ConsumerTry implements RocketMQListener {
    public void onMessage(MessageExt messageExt) {
        //获取重试次数
        int times = messageExt.getReconsumeTimes();
        if (times>3){
            //写入数据库的操作
            return;
        }
        throw new RuntimeException(String.format("尝试第%d次失败",times));
    }
}
其他:

消费者

集群模式:

生产者发送消息到queue只能被1个消费者消费

广播模式:

生产者发送消费到queue能被多个消费者消费

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/827544.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号