1.在pom中导入rabbitMQ整合启动场景依赖
org.springframework.boot
spring-boot-starter-amqp
2.此时容器中自动配置了RabbitAutoConfiguration类,其给容器放四个重要的的对象
@Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProviderconnectionNameStrategy) @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean(RabbitOperations.class) public RabbitTemplate rabbitTemplate(RabbitProperties properties, ObjectProvider messageConverter, ObjectProvider retryTemplateCustomizers, ConnectionFactory connectionFactory) @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) @Bean @ConditionalOnSingleCandidate(RabbitTemplate.class) public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate)
3.主启动类上添加开启Rabbit注解
@EnableRabbit
4.编写配置文件
#配置rabbitMq的配置 #指定虚拟机的地址 spring.rabbitmq.host=145.85.127.158 #指定链接端口 spring.rabbitmq.port=5672 #哪个虚拟地址 spring.rabbitmq.virtual-host=/
5.使用AmqpAdmin进行创建Exchange,Queue,Binding等
@Autowired
private AmqpAdmin amqpAdmin;
//创建交换机
@Test
public void createExchange() {
//创建交换机-交换机名字,是否持久化,是否自动删除
DirectExchange directExchange = new DirectExchange("java-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange创建成功:[{}]", "java-exchange");
}
//创建队列
@Test
public void createQueue() {
//创建一个队列-队列名字,是否持久,是否排他,是否自动删除
Queue queue = new Queue("java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue创建成功:[{}]", "java-queue");
}
//创建一个绑定关系
@Test
public void createBinding() {
// String destination,目的地
// DestinationType destinationType,目的地类型
// String exchange,交换机
// String routingKey,路由键(重要)(通过路由键判断交换机把消息给到哪个队列 )
// Map arguments自定义参数
Binding binding = new Binding("java-queue"
, Binding.DestinationType.QUEUE
, "java-exchange"
, "hello.java", null);
amqpAdmin.declareBinding(binding);
log.info("binding创建成功:[{}]", "java-binding");
}
6.如何通过RabbitTemplate收发消息
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息
// @Test
// public void sendMessage(){
// String msg = "hello world";
// //转换和发送消息,交换机名字,路由键,消息(这里还可以传递对象)
// rabbitTemplate.convertAndSend("java-exchange","hello.java",msg);
// log.info("消息发送成功:[{}]",msg);
// }
//发送消息
@Test
public void sendMessage(){
String msg = "hello world";
//这个要传递的类必须实现序列化
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("哈哈");
orderReturnReasonEntity.setCreateTime(new Date());
//发送对象消息可以是一个json
//转换和发送消息,交换机名字,路由键,消息(这里还可以传递对象)
rabbitTemplate.convertAndSend("java-exchange","hello.java",orderReturnReasonEntity);
log.info("消息发送成功:[{}]",orderReturnReasonEntity);
}
7.监听消息:使用@RabbitListener和@RabbitHandler
@RabbitListener(queues = {"java-queue"})//标识监听这个队列
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl implements OrderItemService {
@RabbitHandler //真正的消息队列处理在这里执行,根据这个参数OrderReturnReasonEntity的类型进行接收
public void recieveMessage1(Message message, OrderReturnReasonEntity content,
Channel channel) {
//获取消息体
System.out.println("接收到的消息:" + message + "===>类型:" + content);
byte[] body = message.getBody();
//获取消息头
MessageProperties messageProperties = message.getMessageProperties();
//进行线程休眠
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("消息处理完成:"+content.getName());
// System.out.println("接收到的消息:"+message+"===>类型:"+message.getClass());
}
@RabbitHandler //真正的消息队列处理在这里执行,根据这个参数OrderReturnReasonEntity的类型进行接收
public void recieveMessage2(Message message, OrderEntity content,
Channel channel) {
//获取消息体
System.out.println("接收到的消息:" + message + "===>类型:" + content);
byte[] body = message.getBody();
//获取消息头
MessageProperties messageProperties = message.getMessageProperties();
//进行线程休眠
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("消息处理完成:"+content.getOrderSn());
// System.out.println("接收到的消息:"+message+"===>类型:"+message.getClass());
}
}
8.保证可靠抵达配置文件:
#开启确认消息已发送到交换机,选择确认类型为交互,/confirm/iCallback调用这个函数 spring.rabbitmq.publisher-/confirm/i-type=correlated #开启发送端消息抵达队列的确认 spring.rabbitmq.publisher-returns=true #只要抵达就以异步方式优先回调这个returncallback函数 spring.rabbitmq.template.mandatory=true #手动ack消息回复 spring.rabbitmq.listener.simple.acknowledge-mode=manual
9.保证可靠抵达服务端配置:
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;//为实现可靠抵达,服务端需要进行定制化set/confirm/iCallback,setReturnCallback
//如果容器中有消息转化对象,就用容器中的
@Bean
public MessageConverter getMessageConverter(){
return new Jackson2JsonMessageConverter();
}
//定制化消息队列模板
@PostConstruct//MyRabbitConfig对象创建完成后,执行这个方法
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("comfirm....correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
}
});
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return....message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
}
});
}
}
10.保证可靠抵达ack(修改监听处理方法):
@RabbitHandler //真正的消息队列处理在这里执行,根据这个参数OrderReturnReasonEntity的类型进行接收
public void recieveMessage1(Message message, OrderReturnReasonEntity content,
Channel channel) throws IOException {
//获取消息体
System.out.println("接收到的消息:" + message + "===>类型:" + content);
byte[] body = message.getBody();
//获取消息头
MessageProperties messageProperties = message.getMessageProperties();
//进行线程休眠
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("消息处理完成:"+content.getName());
//这个在channel通道中是自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// System.out.println("接收到的消息:"+message+"===>类型:"+message.getClass());
try {
//签收模式 进行判断签收
if (deliveryTag%2==0){
channel.basicAck(deliveryTag,false);
System.out.println("签收了消息:"+deliveryTag);
}else {
System.out.println("没有签收消息:"+deliveryTag);
}
} catch (IOException e) {
//网络中断
e.printStackTrace();
//退货模式
channel.basicNack(deliveryTag,false,false);
//channel.basicReject();
System.out.println("拒绝签收消息:"+deliveryTag);
}
}



