- 一、依赖配置
- 1. 引入依赖
- 2. 配置文件
- 3. 主配置
- 二、生产者代码代码Conding
- 2.1. 发送客户端
- 2.2. 确认机制
- 2.3. 消息 return机制
- 2.4. controller
- 2.5. MQ工具类
- 2.6. 常量类
- 三、消费端
- 3.2. 消费者代码
- 3.2. RabbitMQ常用命令
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-discovery
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-config
org.springframework.boot
spring-boot-starter-amqp
2. 配置文件
项目内部配置bootstrap.yml,
server:
port: 8001
spring:
application:
# 应用名称
name: ly-rabbitmq
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: nacos.server.com:8848
config:
# 配置中心地址
server-addr: nacos.server.com:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
nacos-config服务端配置
在这里插入代码片3. 主配置
package com.gblfy.lyrabbitmq.config;
import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTopicConfig {
@Bean
TopicExchange topicExchange() {
return new TopicExchange(MQPrefixConst.WS_EXCEHANGE, true, false);
}
@Bean
Queue hisUQ() {
return new Queue("ly_mq_fai_q");
}
@Bean
Binding hisUQBinding() {
//ly-his.# 代表路由规则 表示如果路由的 routingKey 是以ly-his 开头就会发送到 ly_mq_his_u_q 这个队列上
return BindingBuilder.bind(hisUQ()).to(topicExchange()).with("ly-fai.#");
}
}
二、生产者代码代码Conding
2.1. 发送客户端
package com.gblfy.lyrabbitmq.provider;
import com.alibaba.fastjson.JSON;
import com.gblfy.common.entity.Order;
import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.gblfy.lyrabbitmq.utils.MQSendMsgUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@Service
public class RabbitMQProvider {
private final static Logger log = LoggerFactory.getLogger(RabbitMQProvider.class);
@Autowired
private MQSendMsgUtils mqSendMsgUtils;
public void sendMQContent(long orderId, String orderNum, LocalDateTime createTime) {
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 发送MQ消息到交换机通过指定消息路由key路由到指定队列中
mqSendMsgUtils.snedStrMQMsg(MQPrefixConst.LY_MQ_FAI_QUERY, JSON.toJSONString(order));
log.info("MQ消息发送成功");
}
}
2.2. 确认机制
package com.gblfy.lyrabbitmq./confirm/is;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component("/confirm/iCallback")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
//日志输出
private final static Logger log = LoggerFactory.getLogger(/confirm/iCallBackListener.class);
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息队列标识ID: {}", correlationData.getId());
if (ack) {
log.info("发送消息成功: {}", ack);
} else {
log.info("发送消息失败: {}", ack);
}
}
}
2.3. 消息 return机制
package com.gblfy.lyrabbitmq.returns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
//打印日志 实时定位
private final static Logger log = LoggerFactory.getLogger(ReturnCallBackListener.class);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("DeliveryTag: {}", message.getMessageProperties().getDeliveryTag());
log.info("ContentType: {}", message.getMessageProperties().getContentType());
log.info("ContentEncoding: {}", message.getMessageProperties().getContentEncoding());
log.info("消息发送的指定交换机: {}", exchange);
log.info("队列路由的routingKey: {}", routingKey);
log.info("队列的响应码replyCode: {}", replyCode);
log.info("队列的响应信息: {}", replyText);
//TODO 消息发送交换机成功 路由失败 保存轨迹记录
}
}
2.4. controller
package com.gblfy.lyrabbitmq.controller;
import com.gblfy.lyrabbitmq.provider.RabbitMQProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@RestController
@RequestMapping("/mq")
public class MQProviderController {
@Autowired
private RabbitMQProvider mqProvider;
@GetMapping("/sendMQ")
public String sendMQContent() {
mqProvider.sendMQContent(0001, "10", LocalDateTime.now());
return "OK";
}
}
2.5. MQ工具类
package com.gblfy.lyrabbitmq.utils;
import com.gblfy.lyrabbitmq./confirm/is./confirm/iCallBackListener;
import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.gblfy.lyrabbitmq.returns.ReturnCallBackListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class MQSendMsgUtils {
private final static Logger log = LoggerFactory.getLogger(MQSendMsgUtils.class);
@Autowired//注入发送消息模板
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallBackListener /confirm/iCallback;
@Autowired
private ReturnCallBackListener returnCallback;
public void snedStrMQMsg(String queueRouteKey, String msg) {
try {
log.info("交换机名称: {}, 路由routingKey: {}, 发送的消息: {} ", "EXCHANGE-CMIIP", queueRouteKey, msg);
String mID = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(mID);
// Confirm 消息确认策略
rabbitTemplate.set/confirm/iCallback(/confirm/iCallback);
// Return 消息确认策略
rabbitTemplate.setReturnCallback(returnCallback);
log.info("发送消息的路由key: {}", queueRouteKey);
log.info("发送消息的标识ID: {}", mID);
//发送消息到MQ的交换机,通知其他系统
rabbitTemplate.convertAndSend(MQPrefixConst.WS_EXCEHANGE, queueRouteKey, msg, correlationId);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.6. 常量类
package com.gblfy.lyrabbitmq.consts;
public class MQPrefixConst {
//交换机名称
//回归环境
public static final String WS_EXCEHANGE = "LY-REPORT-EXCHANGE";
// 路由key
public static final String LY_MQ_FAI_QUERY = "ly-fai.query";
}
三、消费端
3.2. 消费者代码
package com.gblfy.lyrabbitmq.consumer;
import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQHandler implements ChannelAwareMessageListener {
//打印日志 实时定位
private final static Logger log = LoggerFactory.getLogger(RabbitMQHandler.class);
@Override
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.str1.queue.name}",
durable = "${spring.rabbitmq.listener.str1.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.str1.exchange.name}",
durable = "${spring.rabbitmq.listener.str1.exchange.durable}",
type = "${spring.rabbitmq.listener.str1.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.str1.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.str1.key}"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
//TODO 接收消息成功 创建一个消费端轨迹表来存储消息的轨迹数据
String jsonMsg = new String(message.getBody());
log.info("响应报文 mResXml: {}", jsonMsg);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 反馈消息的消费状态
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 反馈消息的消费状态System.err.println("--------------------------------------");
//------------------------------根据约定解析指定的标签--------------------------------------------
// JSonObject jsonObject = new JSonObject();
// jsonObject = JSON.parseObject(jsonMsg);
// String msgID = jsonObject.getString("msgID");
// log.info("接收的消息ID: {}", msgID);
//
// String tResXml = jsonObject.getString("tResXml");
// log.info("解析后的zip路径: {}", tResXml);
String queueRouteKey = message.getMessageProperties().getReceivedRoutingKey();
log.info("接收的路由key: {}", queueRouteKey);
if (MQPrefixConst.LY_MQ_FAI_QUERY.equals(queueRouteKey)) {
//TODO 监听查询接口逻辑
//TODO 保存数据到数据库
} else {
log.error("无此路由key: {}", queueRouteKey);
}
}
}
3.2. RabbitMQ常用命令
# 启动MQ rabbitmq-server -detatched



