application.yml里面的publisher-returns、publisher-/confirm/i-type只是针对producer的参数;listener参数只是针对consumer端
RabbitTemplate.waitFor/confirm/is和rabbitmq的/confirm/i、return回调只是针对producer->broker端,但凡它将消息送至broker的交换机就会call /confirm/i,
如果交换机没能将消息投入队列,则会call return. 这一切讲的是producer和broker的事,和consumer没一毛钱关系
Consumer的ack也是同样道理,它只是consumer和mq broker之间的确认,和producer没一毛钱关系
consumer方需要ack的若没有ack确认,则在Broker界面会看到unacked累加,一旦consumer方关闭,这个unacked就会转成ready(待发到consumer),consumer重启后就会又收到这批未ack的消息
@RabbitListener可以通过参数指明是自动ack还是手动ack,这个参数比application.yml的listener设置的优先度高
若同一消息发出能多条队列同时接收,用direct方式的交换机也能实现,那就是将多个队列绑定到同一个交换机同一个routerkey即可
Producer方的回调:
@Component
@Slf4j
public class RabbitMQEvent implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnsCallback{
@Resource
private SMSConfig smsConfig;
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
log.info("============消息已发送至MQ服务器==========");
SMSBean sms= smsConfig.getSMS(correlationData.getId());
if (ack) {
log.info("消息发送ACK成功"+(sms==null?"":":"+sms));
} else {
log.error("消息发送ACK失败,原因{}", cause);
if (sms!=null) { //假如SMS不为空,则重新投到消息队列
}
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("============消息已发送至MQ服务器,但无法投到指定队列==========");
SMSBean sms= smsConfig.getSMS(returnedMessage.getMessage().getMessageProperties().getMessageId());
log.info("returnedMessage: replyCode:{},routerKey:{},{}",returnedMessage.getReplyCode(),returnedMessage.getRoutingKey(),
sms);
}
}
发送方:
rabbitTemplate.setMandatory(true); //设置手动ack
rabbitTemplate.setChannelTransacted(false); //关闭事务
rabbitTemplate.set/confirm/iCallback(rabbitMQEvent);
rabbitTemplate.setReturnsCallback(rabbitMQEvent); ((AbstractMessageConverter)rabbitTemplate.getMessageConverter()).setCreateMessageIds(true);
//发送消息并需要对方确认
@Override
public void sendSMSWithACK(SMSBean sms,String... routerKey) {
// amqpTemplate.setMandatory(true);
ackRabbitTemplate.invoke(c->{
Message message=ackRabbitTemplate.getMessageConverter().toMessage(sms,null);
CorrelationData correlationData= new CorrelationData(message.getMessageProperties().getMessageId());
smsConfig.saveSMS(sms,correlationData.getId());
//要CorrelationDataID与MessageID一致,为了方便/confirm/i/return message回调时拿回消息体。
ackRabbitTemplate.convertAndSend(SMSConstants.SMS_EXCHANGE_DIRECT,
routerKey.length==0?SMSConstants.SMS_KEY_DIRECT3:routerKey[0],
message,correlationData);
return true;
});
接收方:
@RabbitListener(ackMode = "MANUAL",bindings = @QueueBinding(value=@Queue(SMSConstants.SMS_QUEUE_DIRECT3),exchange = @Exchange(value = SMSConstants.SMS_EXCHANGE_DIRECT,
type = ExchangeTypes.DIRECT),key = SMSConstants.SMS_KEY_DIRECT3))
@RabbitHandler
protected void onReceived3(Message message,SMSBean sms, Channel channel) throws IOException, InterruptedException {
log.info("===========onReceived3收到消息(手动ACK)========");
if (consumer!=null) {
consumer.accept(SMSConstants.SMS_KEY_DIRECT3+"-"+SMSConstants.SMS_QUEUE_DIRECT3, sms);
}
//注意,cusumer的这些应答只是跟broker(MQ中间件)之间进行,并没影响到producer。
if (sms.getSubject().equalsIgnoreCase("1")){ //
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认
}
else if (sms.getSubject().equalsIgnoreCase("2")){
//basicNack(boolean multiple, boolean requeue)
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//不确认,丢弃
}
else {
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//不确认,丢弃
if (message.getMessageProperties().getRedelivered()){ //已经重投的就放弃了
log.info("此为重投消息,直接删除:"+sms);
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
else {
log.info("不确认,将消息重新投入队列:"+sms);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); //不确认,并且重投
}
}
}
测试:
SMSBean sms=SMSUtil.genSMS();
sendSMSService.sendSMSWithACK(sms,"xxxqueue");
//投到SMS_KEY_DIRECT中,onReceived/onReceived2接收,发送方有/confirm/i回调
sendSMSService.sendSMSWithACK(sms,SMSConstants.SMS_KEY_DIRECT);
sms.setSubject("1");
sendSMSService.sendSMSWithACK(sms);
sms.setSubject("2");
sms.setContent("此消息不被确认,丢弃");
sendSMSService.sendSMSWithACK(sms);
sms.setSubject("3");
sms.setContent("此消息不被确认并重投队列");
sendSMSService.sendSMSWithACK(sms);
代码链接,觉得有用的话请点个Star: https://gitee.com/tigera15/jms-samples/tree/master



