栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rocketmq基础:发送消息、延时消费、消费重试

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rocketmq基础:发送消息、延时消费、消费重试

rocketmq支持3种消息发送方式
//rocketmq-spring-boot-starter提供的操作rocketmq的类
@Autowired
private RocketMQTemplaterocketMQTemplate;
同步消息(sync message)

producer向broker发送消息,broker服务器返回发送结果后再继续往下执行

public void sendSyncMsg(Stringtopic,String msg){
    rocketMQTemplate.syncSend(tpic,msg);
}
异步消息(sync message)

producer向broker发送消息时指定消息发送成功以及发送异常的回调方法,producer发送消息后线程不阻塞继续往下执行,消息发送成功或失败的回调任务在新的线程中执行

public void sendAsyncMsg(String topic,String msg){
    rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
            
        @Override
        public void onException(Throwable throwable) {
            System.out.println(throwable.getMessage());
        }
    });
}
单向消息(sync message)

producer向broker发送消息不等待broker服务器的结果

public void sendOneWayMsg(String topic,String msg){
    rocketMQTemplate.sendOneWay(topic,msg);
}
rocketmq发送自定义消息类型 生产者
public void sendMsgByJson(String topic, OrderExt orderExt){
    //同步发送
    rocketMQTemplate.convertndSend(topic,orderExt);
}
消费者
@Component
@RocketMQMessageListener(topic = "rocketmq-demo-obj",consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener {
    @Override
    public void onMessage(OrderExt orderExt) {
        System.out.println(orderExt);
    }
}
rocketmq发送延迟消息

发送延迟消息只需要设置延迟时间即可,延迟时间存在18个等级(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel(level)设置与延迟时间相对于的延迟级别即可。

发送同步消息

public void sendMsgByJsonDelay(String topic, OrderExt orderExt){
    Message message =MessageBuilder.withPayload(orderExt).build();
    //第四个参数设置延迟级别
    rocketMQTemplate.syncSend(topic,message,3000,3);
    System.out.printf("send msg:%s",orderExt);
}

发送异步消息

public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt) throws RemotingException, MQClientException, InterruptedException {
    String s = JSON.toJSONString(orderExt);
    org.apache.rocketmq.common.message.Message message = neworg.apache.rocketmq.common.message.Message(topic,s.getBytes(Charset.orName("utf-8")));
    message.setDelayTimeLevel(3);
    //异步消息使用原生方法发送
    rocketMQTemplate.getProducer().send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
        }
        @Override
        public void onException(Throwable throwable) {
            System.out.println(throwable.getMessage());
        }
    });
    System.out.printf("sent msg:%s",s);
}
rocketmq消费重试
@Component
@RocketMQMessageListener(topic = "rocketmq-demo-obj",consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(new Date());
        System.out.println(JSON.parseObject(new String(messageExt.getBody()),OrderExt.class));
        //消费发生异常后会重复消费同一数据,默认会按第3级及之后的延时时间间隔重复消费(即第一次重复消费在首次消费10s后,第二次重复消费在第一次重复消费30s后,以此类推),可以根据消费次数终止重复消费
        if(messageExt.getReconsumeTimes()>2){
            System.out.println("停止重试,写入数据库...");
            return;
        }
        
        throw new RuntimeException("处理消息失败");
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/877051.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号