栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

验证RocketMQ默认重试

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

验证RocketMQ默认重试

验证RocketMQ默认重试

背景:线上有丢消息(无重试)的情况。经过排查,是因为并发时会有一个消息消费失败,但没有进行重试。检查配置无误后决定升级MQ版本。在升级版本前后经过下面的验证。发现新版MQ重试没问题,而老版MQ存在兼容问题而不能重试消息。

先说下结论,后边事对此结论的验证:
RocketMQ的默认重试次数为16次,且默认重试间隔时间为(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)

一、环境说明

老版MQ版本:V4_4_0
新版MQ版本:V4_9_3
java

二、java代码 1.生产者
@RestController
@Api(value = "测试服务", description = "测试服务")
@RequestMapping(value = "/test", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public class TestController {

  private final static Logger logger = LoggerFactory.getLogger(TestController.class);

  @Resource(name = "testReconsumProducer")
  private MqNormalProducer testReconsumProducer;

  @RequestMapping(value = "/testReconsumProducer")
  public Result testReconsumProducer() {

    Result result = Result.instance();
    String msgId = null;

    // 消息体当前时间
    MqMessage mqMessage = new MqMessage("testMessage,time:" + TimeUtils.parseDate(TimeUtils.currentTime()));

    // 发消息
    RocketSendResult rocketResult = (RocketSendResult) testReconsumProducer.sendMsg(mqMessage);

    // 获取消息的msgid
    msgId = rocketResult.getSendResult().getMsgId();
    // 生产者日志打印(因为测试的集群环境,为方便跟踪,使用slf4j记录)
    logger.error("{}生产者,测试MQ重试,rocketResult:{}", rocketResult.getSendResult().getMsgId(), JSON.toJSONString(rocketResult));

    result.setData(msgId);
    return result;
  }
}
2.消费者
@Service("testReconsumeEventHandler")
public class TestReconsumeEventHandler implements MqListener {

  private final static Logger logger = LoggerFactory.getLogger(TestReconsumeEventHandler.class);

  @Override
  public MqAction consume(MqMessage mqMessage, MqConsumeContext mqConsumeContext) {

    // 获取消息(因为做了抽象,所以才需要强转)
    RocketMessage rocketMessage = (RocketMessage) mqMessage;

    // 消费者日志打印(第一次消费会打印一次,其余每重试一次会打印一次,预计1+16次)
    logger.error("{}消费者,测试MQ重试,重新消费:{}", rocketMessage.getOriginMessage().getMsgId(), JSON.toJSONString(rocketMessage));
    
    // 直接进行重发消息
    return MqAction.ReconsumeLater;
  }
}
三、执行结果

经过查看日志。消费者一共打印了17次(1次正常消费,16次重试消费)。
注: 因为打印的时候记录了消息id,所以在kibana上查看日志的时候直接匹配此字符串进行塞选。

四、结论

RocketMQ的默认重试次数为16次,且默认重试间隔时间为(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/759233.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号