1、pom文件引入
|
2、在application.properties定义队列名和交换机名
spring.rabbitmq.host=192.168.19.23 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ # 开启/confirm/is回调 P -> Exchange 发送确认 spring.rabbitmq.publisher-confirms=true # 开启returnedMessage回调 Exchange -> Queue 路由失败回调 spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true # 签收模式为手动签收-那么需要在代码中手动ACK spring.rabbitmq.listener.simple.acknowledge-mode=manual # 每个队列最大的消费者数量 spring.rabbitmq.listener.simple.max-concurrency=1 # 每个队列启动的消费者数量 spring.rabbitmq.listener.simple.concurrency=1 # 每次从RabbitMQ获取的消息数量 spring.rabbitmq.listener.simple.prefetch=1 #定义消息队列 app.rabbitmq.queue.wx.oa=wx-oa-queue #定义死信消息队列 app.rabbitmq.queue.wx.oa.dead.letter=wx-oa-dead-letter-queue #交换机 app.rabbitmq.exchange.wx.oa=wx-oa-exchange #死信交换机 app.rabbitmq.exchange.common.dead.letter=common-dead-letter-exchange |
3、配置队列和交换机绑定
@Slf4j
@Configuration
public class RabbitConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
//队列名
@Value("${app.rabbitmq.queue.wx.oa}")
private String wxOAQueueName;
//通用死信交换机名
@Value("${app.rabbitmq.exchange.common.dead.letter}")
private String commonDeadLetterExchange;
//交换机名
@Value("${app.rabbitmq.exchange.wx.oa}")
private String wxOAExchangeName;
@Value("${app.rabbitmq.queue.wx.oa.dead.letter}")
private String wxOADeadLetterQueue;
@Bean
public Queue wxOAQueue(
) {
return QueueBuilder
.durable(wxOAQueueName)
//声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
.withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
//声明该队列死信消息在交换机的 路由键
.withArgument("x-dead-letter-routing-key", "wx-oa-dead-letter-routing-key")
.build();
}
@Bean
public Exchange wxOAExchange() {
return ExchangeBuilder
.topicExchange(wxOAExchangeName)
.durable(true)
.build();
}
@Bean
public Binding wxOABinding() {
return BindingBuilder
.bind(wxOAQueue())
.to(wxOAExchange())
.with("wx.oa.routing.key")
.noargs();
}
@Bean
public Exchange commonDeadLetterExchange() {
return ExchangeBuilder
.topicExchange(commonDeadLetterExchange)
.durable(true)
.build();
}
@Bean
public Queue wxOADeadLetterQueue() {
return QueueBuilder
.durable(wxOADeadLetterQueue)
.build();
}
@Bean
public Binding wxOADeadLetterBinding() {
return BindingBuilder
.bind(wxOADeadLetterQueue())
.to(commonDeadLetterExchange())
.with("wx-oa-dead-letter-routing-key")
.noargs();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
String msgId = correlationData.getId();
log.info("消息成功发送到Exchange",msgId);
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
}
|
4、消息发送测试
@Component
public class MsgSender { @Autowired
private RabbitTemplate rabbitTemplate;
//routingkey 路由键
@Value("${wx.oa.msg.mq.config.queue.routing.key}")
private String routingkey;
@Value("${app.rabbitmq.exchange.wx.oa}") String wxOAExchangeName;
public void send(WxOfficialAccountsMqMsg msg){
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
msg.setBusinessMsgId(id);
rabbitTemplate.convertAndSend(wxOAExchangeName, this.routingkey, msg,correlationData);
} |
5、消息接收
| @Slf4j public class MessageListener { @RabbitListener(queues = "${app.rabbitmq.queue.wx.oa}") MessageProperties properties = message.getMessageProperties(); } else { } public class MessageHelper {
public static Message objToMsg(Object obj) {
if (null == obj) {
return null;
}
Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
return message;
}
public static @Slf4j
public class JsonUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class);
private static ObjectMapper objectMapper = new ObjectMapper();
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static String parseObjToJson(Object object) {
String string = null;
try {
string = JSONObject.toJSonString(object);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return string;
}
public static |



