1、导入依赖
org.springframework.boot spring-boot-starter-amqp 2.1.4.RELEASE
2、修改配置文件
spring.rabbitmq.host=192.168.3.10 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/
3、创建配置文件类
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4、测试4.1、管理类测试
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange() {
DirectExchange directExchange = new DirectExchange("java.exchange",true,false);
amqpAdmin.declareExchange(directExchange);
}
@Test
public void createQueue() {
Queue queue = new Queue("java.queue", true, false, false);
amqpAdmin.declareQueue(queue);
}
4.2、消息发送与接收
消息发送:
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
rabbitTemplate.convertAndSend("java.exchange","","发送消息成功");
}
消息接收:
@RabbitListener(queues = {"java.queue"})
public void getMessage(Message message, OrderItemEntity orderItemEntity){
byte[] body = message.getBody();
System.out.println("接收到消息:"+message);
}
5、拓展:消息可靠投递5.1、修改配置文件
#开启发送端确认 spring.rabbitmq.publisher-/confirm/is=true #开启发送端消息抵达队列的确认 spring.rabbitmq.publisher-returns=true #只要抵达队列,以异步发送优先回调我们这个return/confirm/i spring.rabbitmq.template.mandatory=true #手动ack消息 spring.rabbitmq.listener.simple.acknowledge-mode=manual5.2、修改配置类,设置发送消息的回调
设置消息抵达交换机和队列的回调:
@Configuration
public class RabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@PostConstruct //相当于RabbitConfig(当前类)创建完成后执行这个方法
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData="+correlationData+" ack="+ack+" cause="+cause);
}
});
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});
}
}
5.3、消息接收并确认
@RabbitListener(queues = {"java.queue"})
@Service("orderItemService")
public class OrderItemServiceImpl {
//@RabbitListener(queues = {"java.queue"})
@RabbitHandler
public void getMessage(Message message, OrderItemEntity orderItemEntity, Channel channel){
byte[] body = message.getBody();
System.out.println("接收到消息:"+message);
//channel内按顺序递增
long tag = message.getMessageProperties().getDeliveryTag();
//签收货物,非批量模式
try {
if (tag%2 == 0) {
//收货
channel.basicAck(tag,false);
System.out.println("签收了");
} else {
//退货 requeue=false 丢弃 requeue=true 发回服务器,重新入队
//long deliveryTag, boolean multiple, boolean requeue
channel.basicNack(tag,false,true);
System.out.println("没有签收");
}
} catch (IOException e) {
//网络中断
e.printStackTrace();
}
}
@RabbitHandler
public void getHandlerMessage(Message message, OrderItemEntity orderItemEntity, Channel channel){
byte[] body = message.getBody();
System.out.println("接收到消息:"+message);
}
@RabbitHandler
public void getHandlerMessage2( OrderItemEntity orderItemEntity){
System.out.println("接收到消息:"+orderItemEntity);
}
}
注:@RabbitListener可以标注在类和方法上,而@RabbitHandler只能标注在方法上。两者的区别是:
@RabbitListener标注在方法上时可以单独使用,而@RabbitHandler需要结合@RabbitListener来一起使用(即@RabbitListener标注在类上指明监听的队列,@RabbitHandle标注在方法上,指明监听的队列的类型)@RabbitListener只能处理同类型对象的队列,而@RabbitHandle可以处理不同类型对象的队列。



