1.进入jar包
org.springframework.boot spring-boot-starter-amqp
2.声明队列、交换机、交换机与队列绑定关系(不需要则不声明,比如简单模式和工作队列模式只需要声明队列)
@Bean public Queue orderDirectQueue() { return new Queue(MQQueueEnum.DIRECT_QUEUE_ORDER.getName(), true, false,false); }@Bean public DirectExchange orderDirectExchange() { return new DirectExchange(MQExchangeEnum.EXCHANGE_DIRECT_ORDER.getName(), true, false); } @Bean public Binding bindingDirect() { return BindingBuilder.bind(orderDirectQueue()) .to(orderDirectExchange()) .with(MQQueueEnum.DIRECT_QUEUE_ORDER.getRoutingKey()); }
3.构建消息发送者
private RabbitTemplate rabbitTemplate; @Autowired public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setEncoding("UTF-8"); this.rabbitTemplate.setConfirmCallback(confirmCallback); this.rabbitTemplate.setReturnCallback(returnCallback); this.rabbitTemplate.setReplyTimeout(60000); }public void sendOrder(String message){ logger.info("发送在线商城订单创建消息: {}", message); sendMsg(MQExchangeEnum.EXCHANGE_DIRECT_ORDER.getName(), MQQueueEnum.DIRECT_QUEUE_ORDER.getRoutingKey(), message, new CorrelationData(RandomUtils.getUUID())); } private void sendMsg(String exchangeName, String routingKey, Object request, CorrelationData correlationData) { rabbitTemplate.convertAndSend(exchangeName, routingKey, request, message -> { message.getMessageProperties().getHeaders().put("spring_returned_message_correlation", correlationData.getId()); message.getMessageProperties().getHeaders().put("send_ip", getLocalIp()); return message; }, correlationData); } private String getLocalIp() { try { return WebToolUtils.getLocalIP(); } catch (Exception e) { logger.error("获取本地IP异常", e); } return "0.0.0.0"; }
4.构建消息消费者
@Component public class OnlineShopOrderReceiver { private static Logger logger = LoggerFactory.getLogger(OnlineShopOrderReceiver.class); @RabbitListener(queues = {"onlineshop.order.queue.direct"}, errorHandler = "rabbitListenerErrorHandler") public void onMessage(Channel channel, Message message) throws IOException, InterruptedException { logger.info("收到在线商城订单创建{}发送的消息{}", message.getMessageProperties().getHeaders().get("send_ip"), message); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 // 处理完业务再取下一条 //channel.basicQos(1); //Thread.sleep(5000); try { Object correlation = message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); if (correlation == null) { logger.info("收到错误消息: {}", message); } else { logger.info("消息处理: {}", new String(message.getBody())); } } finally { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }}
5.配置文件需要配置MQ连接信息
# 配置RabbitMQ的基本信息 ip 端口 username password..
spring:
rabbitmq:
host: #ip
port: 5672
username: admin
password: 123456
virtual-host: /
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
retry:
enabled: true
initial-interval: 3000ms
max-attempts: 3
max-interval: 10000ms
multiplier: 1
#配置消费端
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
#最小的消费者数量
concurrency: 5
#消费失败后从新消费
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
default-requeue-rejected: true
#最大的消费者数量
max-concurrency: 5
#一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量
prefetch: 1
retry:
#监听重试是否可用
enabled: true
#第一次和第二次尝试传递消息的时间间隔
initial-interval: 1000ms
#最大重试次数
max-attempts: 5
#最大重试时间间隔
max-interval: 10000ms
#应用于上一重试间隔的乘数
multiplier: 1.0
#重试时有状态or无状态
stateless: false
RabbitMQ高级特性
1.消息的可靠性投递
springboot开启confirm,return模式
publisher-confirms: true
publisher-returns: true
final RabbitTemplate.ConfirmCallback confirmCallback =
(correlationData, ack, cause) -> logger.info("correlationData: {} ack:{}", correlationData, ack);
final RabbitTemplate.ReturnCallback returnCallback =
(message, replyCode, replyText, exchange, routingKey) ->
logger.info("消息{}不可达,exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
message.toString(), exchange, routingKey, replyCode, replyText);
消费端设置消息签收
listener:
simple:
acknowledge-mode: manual/none
manual:手动确认(推荐使用),消息手动签收后,才会从队列中清空,如果此处业务处理异常,可以不签收,调用channel.basicNack回退给队列,让队列自动重新发送(一般不这么做),如果异常一直存在的话,消息会不断的发送,造成网络开销,建议签收然后将异常的消息放入缓存或数据库,手动处理。
none:自动确认,消息会在消费端自动签收,如果消息处理异常也不会重新发送消息了,造成了消息丢失。
保证消息可靠性2.消费端限流
- 持久化--交换机持久化,队列持久化,消息持久化
- 生成端--confirm+return消息确认机制
- 消费端--消息签收处理
- Broker要高可用
可用设置一次性从队列中拉取多少条消息,消息签收一定要设置为手动签收
listener:
simple:
acknowledge-mode: manual
prefetch: 1
如果从队列中拉取全量消息,势必会对消费端造成压力
3.TTLTime To Live 存活时间,可用设置每条消息存活时间
private void sendMsg(String exchangeName, String routingKey, Object request, CorrelationData correlationData) {
rabbitTemplate.convertAndSend(exchangeName, routingKey, request, message -> {
message.getMessageProperties().getHeaders().put("spring_returned_message_correlation", correlationData.getId());
message.getMessageProperties().getHeaders().put("send_ip", getLocalIp());
// 设置TTL消息过期时间
message.getMessageProperties().setExpiration("10000");
return message;
}, correlationData);
也可以设置队列中全部消息存活时间,在声明队列时设置:
@Bean
public Queue orderDirectQueue() {
Map map = new HashMap();
// 设置队列中消息过期时间
map.put("x-message-ttl",6000);
return new Queue(MQQueueEnum.DIRECT_QUEUE_ORDER.getName(), true, false, false, map);
}
4.死信队列
DLX:Dead Letter Exchange
当消息成为死信息后,会被发送达死信交换机中。
成为死信的情况:
1.队列消息长度达到限制
2.消息没被消费者消费,且不重回队列
3.消息设置了过期时间,到达了过期时间,没有被消费
5.延时队列TTL+死信队列构成延时队列,可用于订单过期未支付的一些业务处理
声明队列和交换机
@Bean
public Queue orderDirectQueue() {
Map map = new HashMap();
// 设置队列中消息过期时间
map.put("x-message-ttl",6000);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
map.put("x-dead-letter-exchange", "deadLetterExchange");
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
map.put("x-dead-letter-routing-key", "dle.err");
return new Queue(MQQueueEnum.DIRECT_QUEUE_ORDER.getName(), true, false, false, map);
}
//声明死信Exchange
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange("deadLetterExchange");
}
// 声明死信队列A
@Bean
public Queue deadLetterQueue(){
return new Queue("dle-queue");
}
@Bean
public Binding deadLetterQueueBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange){
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.err");
}
消费者:
@RabbitListener(queues = "dle-queue") public void deadLetterQueue(String msg, Channel channel, Message message) throws IOException { logger.info("死信队列消费消息: {}", msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }



