1. 引入依赖
org.springframework.boot spring-boot-starter-amqp
2. 配置ymal
spring:
rabbitmq:
host: 192.168.56.10
port: 5672
virtual-host: /
publisher-/confirm/is: true # 开启发送端确认
publisher-returns: true # 开启发送端消息抵达队列确认
template:
mandatory: true # 只要抵达队列,以异步发送优先回调我们这个returnconnfirms
listener:
direct:
acknowledge-mode: manual # 手动ack消息
3. 使用java操作mq
package com.systop.gulimall.order;
import com.rabbitmq.client.ConnectionFactory;
import com.systop.common.utils.Query;
import com.systop.gulimall.order.entity.OrderEntity;
import com.systop.gulimall.order.entity.RefundInfoEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
// 发布消息
@Test
void sendMessage(){
String msg = "java.hello";
OrderEntity orderEntity = new OrderEntity();
orderEntity.setReceiverName("杜宜洲");
orderEntity.setOrderSn("213");
rabbitTemplate.convertAndSend("java.exchange.direct","java.hello",orderEntity);
log.info("消息发布成功!!");
}
// 创建交换机
@Test
void createExchanges() {
DirectExchange directExchange = new DirectExchange("java.exchange.direct", true, false);
amqpAdmin.declareExchange(directExchange);
log.info("交换机[{}]创建成功", "java.exchange.direct");
}
// 创建队列
@Test
void createQueue() {
// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("队列[{}]创建成功", "hello-java-queue");
}
// 交换机绑定队列
@Test
void binding(){
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "java.exchange.direct", "java.hello", null);
amqpAdmin.declareBinding(binding);
log.info("交换机[{}]绑定成功", "java.exchange.direct");
}
}
4. 使用请求发布消息
Autowired
private RabbitTemplate rabbitTemplate;
@ResponseBody
@GetMapping("/send")
public String sendMsg (){
for (int i = 0; i < 10; i++) {
if (i % 2 == 0){
OrderEntity orderEntity = new OrderEntity();
orderEntity.setReceiverName("杜宜洲"+i);
orderEntity.setOrderSn("213"+i);
rabbitTemplate.convertAndSend("java.exchange.direct","java.hello",orderEntity,new CorrelationData(UUID.randomUUID().toString()));
log.info("消息发布成功!!");
}else {
OrderItemEntity orderItemEntity = new OrderItemEntity();
orderItemEntity.setSkuAttrsVals("dasdf");
orderItemEntity.setOrderId(1L);
rabbitTemplate.convertAndSend("java.exchange.direct","java.hello22",orderItemEntity,new CorrelationData(UUID.randomUUID().toString()));
}
}
return "ok";
}
5. 接收消息
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl implements OrderItemService {
// @RabbitListener(queues = {"hello-java-queue"})
@RabbitHandler
public void recieveMessage(Message message, OrderEntity orderEntity, Channel channel) throws IOException {
System.out.println("接收到消息" + orderEntity);
byte[] body = message.getBody();
// channel 内按顺序自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag==>" + deliveryTag);
// 手动签收,非批量模式
try {
if (deliveryTag % 2 == 0){
// 收货
channel.basicAck(deliveryTag,false);
System.out.println("签收了货物..." + deliveryTag);
}else {
// 退货
// requeue(b1) = false 丢弃 requeue=true 发回服务器,服务器重新入队
// b: 是否批量拒收 b1: 拒收后是否重新入队
channel.basicNack(deliveryTag,false,false);
System.out.println("没有签收了货物");
}
}catch (Exception e){
// 网络中断了
}
}
}
5. 因为默认的序列化机制是jdk默认的所以需要自己配置序列化,创建一个config类
package com.systop.gulimall.order.config;
import com.mysql.cj.protocol.MessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@PostConstruct
public void initRabbitTemplate(){
System.out.println(123);
System.out.println(rabbitTemplate);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("/confirm/i...correlationData["+correlationData+"] ==> ack" +"["+b+"]");
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("Fail Message [" + message +"]" + "==>" + "回复的文本内容[" + s + "]");
}
});
}
// 这个是接收数据的时候需要配置
@Bean
public RabbitListenerContainerFactory> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}



