1 github:源码地址 2 rabbitmq05 子工程1./confirm/iCallback、ReturnsCallback回调
2.事务
3./confirm/i确认模式
4.0.0 com.yzm rabbitmq 0.0.1-SNAPSHOT ../pom.xml rabbitmq05 0.0.1-SNAPSHOT jar rabbitmq05 Demo project for Spring Boot org.springframework.boot spring-boot-maven-plugin
项目结构
application.yml
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
publisher-/confirm/i-type: correlated
publisher-returns: true
3 消息回调
配置类
package com.yzm.rabbitmq05.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class RabbitConfig {
@Bean("rabbit")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//数据转换为json存入消息队列
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.set/confirm/iCallback(/confirm/iCallback());
template.setReturnsCallback(returnCallback());
template.setMandatory(true);
return template;
}
public static RabbitTemplate./confirm/iCallback /confirm/iCallback() {
return new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
// ack判断消息发送到交换机是否成功
System.out.println("回调id:" + correlationData.getId());
if (ack) {
// 消息发送成功到达交换机
// ...
System.out.println("消息成功到达交换机");
} else {
System.out.println("消息到达交换机失败");
System.out.println("错误信息:" + cause);
}
}
};
}
public static RabbitTemplate.ReturnsCallback returnCallback() {
return new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 该交换机没有路由键匹配对应的消息队列
// 如果信息走了该回调,就不会走/confirm/i回调了
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
System.out.println("消息主体 : " + returnedMessage.getMessage());
System.out.println("回复代码 : " + returnedMessage.getReplyCode());
System.out.println("描述:" + returnedMessage.getReplyText());
}
};
}
}
生产者
package com.yzm.rabbitmq05.service;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.UUID;
@Component
public class SenderService {
@Resource(name = "rabbit")
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendA() {
// 全局唯一
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
String message = "Hello world! @yzm";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
rabbitTemplate.convertAndSend("callback.exchange", "callback.a.yzm", message, correlationData);
}
}
消费者
package com.yzm.rabbitmq05.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiverService {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "callback-a"),
exchange = @Exchange(value = "callback.exchange", type = ExchangeTypes.TOPIC),
key = {"callback.a.*", "callback.*.a"}
))
public void receiveA(Message message) {
System.out.println(" [ 消费者@A号 ] Received ==> '" + new String(message.getBody()) + "'");
}
}
运行结果:
- 消息正确到达交换机触发回调
修改生产者、指定一个不存在的交换机
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendA() {
// 全局唯一
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
String message = "Hello world! @yzm";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
rabbitTemplate.convertAndSend("yzm.callback.exchange", "callback.a.yzm", message, correlationData);
}
运行结果:
- 消息找不到交换机触发回调
修改生产者,指定一个不存在的路由键
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendA() {
// 全局唯一
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
String message = "Hello world! @yzm";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
rabbitTemplate.convertAndSend("callback.exchange", "callback.yzm.yzm", message, correlationData);
}
运行结果:
- 消息路由失败触发回调
路由失败还可以通过添加监听器处理
修改配置类
@Configuration
@EnableScheduling
public class RabbitConfig {
@Bean(name = "channel")
public Channel channel(ConnectionFactory connectionFactory) {
// true,启动事务;
return connectionFactory.createConnection().createChannel(false);
}
...
}
生产者
package com.yzm.rabbitmq05.service;
import com.rabbitmq.client.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.UUID;
@Component
public class SenderService {
@Resource(name = "rabbit")
private RabbitTemplate rabbitTemplate;
@Resource(name = "channel")
private Channel channel;
// @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendA() {
// 全局唯一
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
String message = "Hello world! @yzm";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
rabbitTemplate.convertAndSend("callback.exchange", "callback.yzm.yzm", message, correlationData);
}
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendB() throws IOException {
//消息不可达,回调
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
});
String message = "Hello world! @yzm";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
channel.basicPublish("callback.exchange", "callback.yzm.yzm", true,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}
}
4 事务运行结果:
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
channel.txSelect()声明启动事务模式;
channel.txComment()提交事务;
channel.txRollback()回滚事务;
事务createChannel(true)跟publisher-/confirm/i-type: correlated不能同时设置
修改配置
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
publisher-/confirm/i-type: none
publisher-returns: true
@Bean(name = "channel")
public Channel channel(ConnectionFactory connectionFactory) {
// true,启动事务;
return connectionFactory.createConnection().createChannel(true);
}
生产者
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendC() throws IOException {
String message = "Hello world! @yzm";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
try {
// 声明事务
channel.txSelect();
// 发送消息
channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
//int i = 1/0;
// 提交事务
channel.txCommit();
} catch (Exception e) {
// 事务回滚
channel.txRollback();
}
}
消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "tx-a"),
exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT),
key = {"tx.yzm"}
))
public void receiveC(Message message) {
System.out.println(" [ 消费者@C号 ] Received ==> '" + new String(message.getBody()) + "'");
}
5 /confirm/i确认模式无异常
制造异常
消息回滚,消费者没有消息可消费
/confirm/i发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
/confirm/i的三种实现方式:
channel.waitForConfirms()普通发送方确认模式;
channel.waitForConfirmsOrDie()批量确认模式;
channel.addConfirmListener()异步监听发送方确认模式;
修改配置
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
publisher-/confirm/i-type: correlated
publisher-returns: true
@Bean(name = "channel")
public Channel channel(ConnectionFactory connectionFactory) {
// true,启动事务;
return connectionFactory.createConnection().createChannel(false);
}
生产者、普通/confirm/i模式
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendD() {
try {
String message = "Hello world! @yzm";
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
// 开启/confirm/i确认模式
channel./confirm/iSelect();
// 发送消息
channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 等待消息被确认
if (channel.waitFor/confirm/is()) {
System.out.println("消息发送成功");
} else {
// 返回false可以进行补发。
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// channel.waitForConfirms 可能返回超时异常
// 可以进行补发。
}
}
消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "/confirm/i-a"),
exchange = @Exchange(value = "/confirm/i.exchange", type = ExchangeTypes.FANOUT)
))
public void receiveD(Message message) {
System.out.println(" [ 消费者@D号 ] Received ==> '" + new String(message.getBody()) + "'");
}
运行结果:
生产者生产消息,发送到消费者,消费者确认后,生产者收到确认
生产者、批量/confirm/i模式;消费者不变
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendE() {
try {
StringBuilder message = new StringBuilder("Hello world! @yzm");
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
// 开启/confirm/i确认模式
channel./confirm/iSelect();
// 发送消息
for (int i = 1; i <= 5; i++) {
message.append("_").append(i);
channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
}
// 阻塞线程,等待消息被确认。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。
channel.waitFor/confirm/isOrDie();
} catch (InterruptedException e) {
// 可以进行补发。
} catch (IOException e) {
//
}
System.out.println("全部执行完成");
}
运行结果:
生产者、异步/confirm/i模式;消费者不变
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
public void sendF() {
try {
StringBuilder message = new StringBuilder("Hello world! @yzm");
System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
// 开启/confirm/i确认模式
channel./confirm/iSelect();
// 发送消息
for (int i = 1; i <= 5; i++) {
message.append("_").append(i);
channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
}
//异步监听确认和未确认的消息
channel.add/confirm/iListener(new /confirm/iListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("已确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple);
}
});
} catch (IOException e) {
//
}
}
运行结果:



