栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JMS(RabbitMQ)的进阶一:发送端的callback和接收端的手动确认

JMS(RabbitMQ)的进阶一:发送端的callback和接收端的手动确认

知识点

application.yml里面的publisher-returns、publisher-/confirm/i-type只是针对producer的参数;listener参数只是针对consumer端
  RabbitTemplate.waitFor/confirm/is和rabbitmq的/confirm/i、return回调只是针对producer->broker端,但凡它将消息送至broker的交换机就会call /confirm/i,
如果交换机没能将消息投入队列,则会call return. 这一切讲的是producer和broker的事,和consumer没一毛钱关系
  Consumer的ack也是同样道理,它只是consumer和mq broker之间的确认,和producer没一毛钱关系
  consumer方需要ack的若没有ack确认,则在Broker界面会看到unacked累加,一旦consumer方关闭,这个unacked就会转成ready(待发到consumer),consumer重启后就会又收到这批未ack的消息
  @RabbitListener可以通过参数指明是自动ack还是手动ack,这个参数比application.yml的listener设置的优先度高
  若同一消息发出能多条队列同时接收,用direct方式的交换机也能实现,那就是将多个队列绑定到同一个交换机同一个routerkey即可

Producer方的回调:

@Component
@Slf4j
public class RabbitMQEvent implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnsCallback{
  @Resource
  private SMSConfig smsConfig;

  
  @Override
  public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
    log.info("============消息已发送至MQ服务器==========");
    SMSBean sms= smsConfig.getSMS(correlationData.getId());
    if (ack) {
      log.info("消息发送ACK成功"+(sms==null?"":":"+sms));
    } else {
      log.error("消息发送ACK失败,原因{}", cause);
      if (sms!=null) { //假如SMS不为空,则重新投到消息队列
      }
    }
  }

  
  @Override
  public void returnedMessage(ReturnedMessage returnedMessage) {
    log.info("============消息已发送至MQ服务器,但无法投到指定队列==========");
    SMSBean sms= smsConfig.getSMS(returnedMessage.getMessage().getMessageProperties().getMessageId());
    log.info("returnedMessage: replyCode:{},routerKey:{},{}",returnedMessage.getReplyCode(),returnedMessage.getRoutingKey(),
            sms);

  }
}

发送方:

rabbitTemplate.setMandatory(true); //设置手动ack
      rabbitTemplate.setChannelTransacted(false); //关闭事务
      rabbitTemplate.set/confirm/iCallback(rabbitMQEvent);
      rabbitTemplate.setReturnsCallback(rabbitMQEvent);      ((AbstractMessageConverter)rabbitTemplate.getMessageConverter()).setCreateMessageIds(true);
//发送消息并需要对方确认
  @Override
  public void sendSMSWithACK(SMSBean sms,String... routerKey) {
   // amqpTemplate.setMandatory(true);
    ackRabbitTemplate.invoke(c->{
      Message message=ackRabbitTemplate.getMessageConverter().toMessage(sms,null);
      CorrelationData correlationData= new CorrelationData(message.getMessageProperties().getMessageId());
      smsConfig.saveSMS(sms,correlationData.getId());
      //要CorrelationDataID与MessageID一致,为了方便/confirm/i/return message回调时拿回消息体。
      ackRabbitTemplate.convertAndSend(SMSConstants.SMS_EXCHANGE_DIRECT,
              routerKey.length==0?SMSConstants.SMS_KEY_DIRECT3:routerKey[0],
              message,correlationData);
      return true;
      
    });

接收方:

 @RabbitListener(ackMode = "MANUAL",bindings = @QueueBinding(value=@Queue(SMSConstants.SMS_QUEUE_DIRECT3),exchange = @Exchange(value = SMSConstants.SMS_EXCHANGE_DIRECT,
          type = ExchangeTypes.DIRECT),key = SMSConstants.SMS_KEY_DIRECT3))
  @RabbitHandler
  protected void onReceived3(Message message,SMSBean sms, Channel channel) throws IOException, InterruptedException {
    log.info("===========onReceived3收到消息(手动ACK)========");
    if (consumer!=null) {
      consumer.accept(SMSConstants.SMS_KEY_DIRECT3+"-"+SMSConstants.SMS_QUEUE_DIRECT3, sms);
    }
    //注意,cusumer的这些应答只是跟broker(MQ中间件)之间进行,并没影响到producer。
    if (sms.getSubject().equalsIgnoreCase("1")){ //
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认
    }
    else if (sms.getSubject().equalsIgnoreCase("2")){
      //basicNack(boolean multiple, boolean requeue)
      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//不确认,丢弃
    }
    else {
      //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//不确认,丢弃
      if (message.getMessageProperties().getRedelivered()){ //已经重投的就放弃了
        log.info("此为重投消息,直接删除:"+sms);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
      }
      else {
        log.info("不确认,将消息重新投入队列:"+sms);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); //不确认,并且重投
      }
    }
  }

测试:

    SMSBean sms=SMSUtil.genSMS();
    sendSMSService.sendSMSWithACK(sms,"xxxqueue");
    //投到SMS_KEY_DIRECT中,onReceived/onReceived2接收,发送方有/confirm/i回调
    sendSMSService.sendSMSWithACK(sms,SMSConstants.SMS_KEY_DIRECT);

    sms.setSubject("1");
    sendSMSService.sendSMSWithACK(sms);
    sms.setSubject("2");
    sms.setContent("此消息不被确认,丢弃");
    sendSMSService.sendSMSWithACK(sms);
    sms.setSubject("3");
    sms.setContent("此消息不被确认并重投队列");
    sendSMSService.sendSMSWithACK(sms);


代码链接,觉得有用的话请点个Star: https://gitee.com/tigera15/jms-samples/tree/master

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701664.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号