- RabbitMQ整合SpringBoot
- 导入Maven依赖
- 配置文件
- 队列与死信队列
- 通过配置文件 添加Bean创建交换机以及队列
- config
- controller
- consumer
- 手动ACK应达
- 消费者案例
- RabbitMQ的交换机插件
- 使用RabbitMQ中的插件来解决以上的生产者指定过期时间的弊端
- 插件下载以及Docker安装
- Docker安装
- 创建插件类型的交换机
- config
- controller
- sonsumer
- 配置回调
- callback
- 备份交换机
- Config
- controller
- consumer
- 优先级队列
- config
- controller
- consumer
RabbitMQ官网
导入Maven依赖配置文件org.springframework.boot spring-boot-starter-amqp org.springframework.amqp spring-rabbit-test test
# RabbitMQ
spring:
rabbitmq:
port: 5672
addresses: 116.62.113.241
virtual-host: /
username: guest
password: guest
# 确认消息已发送大宋交换机(Exchange)选中确认模式为交互
publisher-/confirm/i-type: correlated
# 开启发送端消息抵达队列的确认
publisher-returns: true
# 只要抵达队列,以异步发送优先回调我们这个returnsfirm
template:
mandatory: true
# 手动ack消息
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: auto
# 并发消费者初始化值
concurrency: 1
# 并发消费者的最大值
max-concurrency: 10
# 每个消费者每次监听时可拉取处理的消息数量
# 在单个请求中处理的消息个数,他应该大于等于事务数量(unAck的最大数量)
prefetch: 1
# 是否支持重试
retry:
enabled: true
- spring.rabbitmq.publisher-/confirm/i-type
- NONE 禁用发布确认模式,是默认值
- CORRELATED 发布消息成功到交换器后会触发回调方法
- SIMPLE
- 经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法
- 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
通过配置文件 添加Bean创建交换机以及队列 config创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间 由生产者指定时间
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class CreateMQConfig {
// 普通交换机的名称
public static final String ORDINARY_EXCHANGE_X = "X";
// 普通队列名称
public static final String ORDINARY_QUEUE_A = "QA";
public static final String ORDINARY_QUEUE_B = "QB";
// 死信交换机的名称
public static final String DEAD_LETTER_EXCHANGE_Y = "Y";
// 死信队列名称
public static final String DEAD_LETTER_QUEUE = "QD";
@Bean
public DirectExchange XExchange() {
return new DirectExchange(ORDINARY_EXCHANGE_X);
}
@Bean
public DirectExchange YExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE_Y);
}
@Bean
public Queue queueA() {
Map arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_Y);
arguments.put("x-dead-letter-routing-key", "YD");
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(ORDINARY_QUEUE_A).withArguments(arguments).build();
}
@Bean
public Queue queueB() {
Map arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_Y);
arguments.put("x-dead-letter-routing-key", "YD");
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(ORDINARY_QUEUE_B).withArguments(arguments).build();
}
@Bean
public Queue queueQD() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding queueABindingX(Queue queueA, DirectExchange XExchange) {
return BindingBuilder.bind(queueA).to(XExchange).with("XA");
}
@Bean
public Binding queueBBindingX(Queue queueB, DirectExchange XExchange) {
return BindingBuilder.bind(queueB).to(XExchange).with("XB");
}
@Bean
public Binding queueQDBindingY(Queue queueQD, DirectExchange YExchange) {
return BindingBuilder.bind(queueQD).to(YExchange).with("YD");
}
// ----------增加一个队列 不设置过期时间 过期时间由生产者指定 绑定 X 交换机 通过路由 XC 到QC队列----
// 普通队列名称
public static final String ORDINARY_QUEUE_QC = "QC";
@Bean
public Queue queueQC() {
Map arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_Y);
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(ORDINARY_QUEUE_QC).withArguments(arguments).build();
}
@Bean
public Binding queueQCBindingX(Queue queueQC, DirectExchange XExchange) {
return BindingBuilder.bind(queueQC).to(XExchange).with("XC");
}
}
controller
sendMessage 向X交换机发送消息 分别通过不同的路由 到指定队列中
sendMessageAssignTTL 由生产者指定消息的发送时间有弊端 因为RabbitMQ只会检查第一个消息是否过期 可以通过插件延迟队列进行弥补
import com.llc.rabbitmq.common.api.R;
import com.llc.rabbitmq.config.CreatePlugInDelayedMQConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Api(tags = "RabbitMQ消息发送API")
@Slf4j
@RestController
@RequestMapping("/v1/rabbitmq")
public class SendMsgController {
private final
RabbitTemplate rabbitTemplate;
public SendMsgController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@ApiOperation(value = "向RabbitMQ中发送消息")
@ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path")
@GetMapping("/send/{message}")
public R sendMessage(@PathVariable String message) {
log.info("当前时间:{} --- 发送一条消息给两个TTL队列 | {}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10S的队列" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40S的队列" + message);
return R.ok("消息发送成功");
}
@ApiOperation(value = "向RabbitMQ中发送消息,指定消息的过期时间TTL")
@ApiImplicitParams({
@ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path"),
@ApiImplicitParam(name = "ttl_time", value = "过期时间", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path")
})
@GetMapping("/send/{message}/{ttl_time}")
public R sendMessageAssignTTL(@PathVariable String message, @PathVariable String ttl_time) {
log.info("当前时间:{} --- 发送一条消息给QC队列 | {} | 过期时间 - {}MS", new Date(), message, ttl_time);
rabbitTemplate.convertAndSend("X", "XC", "消息自定义过期时间为" + ttl_time + "的队列" + message, (msg) -> {
// 设置消息消费的延迟时间
msg.getMessageProperties().setExpiration(ttl_time);
return msg;
});
return R.ok("消息发送成功");
}
}
consumer
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
// 指定监听的队列名
@RabbitListener(queues = {"QD"})
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{} --- 收到死信队列的消息 | {}", new Date(), msg);
}
}
手动ACK应达
-
配置开启
# 手动应答 spring.listener.simple.acknowledge-mode=manual
在消费者中手动进行消息应答
channel.basicAck 签收
channel.basicNack 拒签
import com.llc.rabbitmq.config.PriorityMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class PriorityQueueConsumer {
@RabbitListener(queues = {PriorityMQConfig.PRIORITY_QUEUE_NAME})
public void receivePriorityQueue(Message message, String body, Channel channel) {
log.info("《优先级》队列消费者 message==>[{}],body==>[{}]", message, body);
//获取交货标签 内容安顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag, false);
if (deliveryTag % 2 == 0) {
channel.basicNack(deliveryTag, false, true);
// channel.basicReject(); //另外一种拒签方法
}
} catch (IOException e) {
log.error("网络终端了", e);
}
}
}
RabbitMQ的交换机插件
使用RabbitMQ中的插件来解决以上的生产者指定过期时间的弊端
插件下载以及Docker安装
解决队列先进先出的情况,比如第一个进到队列的到期时间是30分钟,第二个进入队列的到期时间是20分钟,但是由于队列的先进先出原则,第二个进入的会被阻塞了,等到第一个到期了才会被一起延迟处理
原理:之前的的延迟是在队列中做的通过设置队列的TTL来延迟推送给消费者
而现在的延迟是在交换机中做,交换机延迟推送给队列,队列接收到消息直接推送给消费者,来弥补队列做延迟的弊端
-
插件 rabbitmq_delayed_message_exchange
插件下载地址
docker run --name rabbitmq --restart=always -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 -d rabbitmq:management # 4369,25672 (Erlang发现&集群端口) | 5672,5671(AMQP端口) | 15672(web管理后台端口) | 61613,61614( STOMP协议端口) | 1883,8883(MQTT协议端口) # 默认密码 guest # 访问地址 http://127.0.0.1:15672 # 安装插件 ## 将插件上传到服务器中 docker cp /dockeres/rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins ## 进入容器启动插件 docker exec -it rabbitmq bash ## 启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange ## 禁用插件 rabbitmq-plugins disable rabbitmq_delayed_message_exchange ## 重启容器 docker restart rabbitmq创建插件类型的交换机 config
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class CreatePlugInDelayedMQConfig {
public static final String DEFAULT_EXCHANGE_NAME = "delayed.exchange";
public static final String DEFAULT_QUEUE_NAME = "delayed.queue";
public static final String DEFAULT_ROUTING_KEY = "delayed.routingKey";
@Bean
public CustomExchange delayedExchange() {
Map arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DEFAULT_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DEFAULT_QUEUE_NAME).build();
}
@Bean
public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DEFAULT_ROUTING_KEY).noargs();
}
}
controller
import com.llc.rabbitmq.common.api.R;
import com.llc.rabbitmq.config.CreatePlugInDelayedMQConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Api(tags = "RabbitMQ消息发送API")
@Slf4j
@RestController
@RequestMapping("/v1/rabbitmq")
public class SendMsgController {
private final
RabbitTemplate rabbitTemplate;
public SendMsgController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@ApiOperation(value = "向RabbitMQ中发送消息,基于插件的延迟对列")
@ApiImplicitParams({
@ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path"),
@ApiImplicitParam(name = "delay_time", value = "延迟时间", required = true, dataType = "Integer", dataTypeClass = Integer.class, paramType = "path")
})
@GetMapping("/send/plug_in/{message}/{delay_time}")
public R sendMessagePlugInAssignTTL(@PathVariable String message, @PathVariable Integer delay_time) {
log.info("当前时间:{} --- 发送一条消息给QC队列 | {} | 过期时间 - {}MS", new Date(), message, delay_time);
rabbitTemplate.convertAndSend(CreatePlugInDelayedMQConfig.DEFAULT_EXCHANGE_NAME, CreatePlugInDelayedMQConfig.DEFAULT_ROUTING_KEY, message, msg -> {
// 设置延迟时间
msg.getMessageProperties().setDelay(delay_time);
return msg;
});
return R.ok("消息发送成功");
}
}
sonsumer
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DelayQueueConsumer {
@RabbitListener(queues = {"delayed.queue"})
public void delayQueueConsumer(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{} --- 收到延迟队列的消息 | {}", new Date(), msg);
}
}
配置回调
-
消息准确发送到交换机的确认回调
-
消息未抵达队列的确认回调
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class MessageSendExchangeCallback {
private final RabbitTemplate rabbitTemplate;
public MessageSendExchangeCallback(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
//@PostConstruct--RabbitMqConfig方法创建完成后,执行这个方法
@PostConstruct
public void initRabbitTemplate() {
//设置确认回调
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
log.warn("[[[交换机消息抵达确认]]] /confirm/i...[{}]===>ack[{}]==>cause[{}]]", correlationData, ack, cause);
}
});
//设置消息未抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println();
log.warn("[[[消息未抵达队列]]] message...[{}]===>replyCode[{}]==>replyText[{}]==>exchange[{}]==>routingKey[{}]", message, replyCode, replyText, exchange, routingKey);
}
});
}
}
备份交换机
Config消息推送失败后将消息投递给备份交换机,通过备份交换机来进行对应处理
⚠️mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先 级高,经过上面结果显示答案是备份交换机优先级高。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class /confirm/iConfig {
public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
public static final String /confirm/i_ROUTING_KEY = "/confirm/i.routingKey";
@Bean
public DirectExchange /confirm/iExchange() {
return ExchangeBuilder.directExchange(/confirm/i_EXCHANGE_NAME)
.durable(true)
//设置该交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
.build();
}
@Bean
public Queue /confirm/iQueue() {
return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(Queue /confirm/iQueue, DirectExchange /confirm/iExchange) {
return BindingBuilder.bind(/confirm/iQueue).to(/confirm/iExchange).with(/confirm/i_ROUTING_KEY);
}
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
// 备份队列
public static final String BACKUP_QUEUE_NAME = "backup.queue";
// 警告队列
public static final String WARNING_QUEUE_NAME = "warning.queue";
@Bean
public FanoutExchange backupExchange() {
return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE_NAME).build();
}
@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean
public Binding backupQueueBindingBackupExchange(Queue backupQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding warningQueueBindingBackupExchange(Queue warningQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
controller
import com.llc.rabbitmq.common.api.R;
import com.llc.rabbitmq.config./confirm/iConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@Api(tags = "RabbitMQ消息发送并确认API")
@Slf4j
@RestController
@RequestMapping("/v1/rabbitmq//confirm/i")
public class ProducerController {
private final RabbitTemplate rabbitTemplate;
public ProducerController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@ApiOperation(value = "向RabbitMQ中发送消息,并确认消息发送")
@ApiImplicitParams({
@ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path")
})
@GetMapping("/send/{message}")
public R sendMessage/confirm/i(@PathVariable String message) {
// 构造一个消息确认回调的相关数据
String msgId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData();
correlationData.setId(msgId);
correlationData.setReturnedMessage(
MessageBuilder.withBody(message.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setCorrelationId(msgId)
.build()
);
rabbitTemplate.convertAndSend(/confirm/iConfig./confirm/i_EXCHANGE_NAME, /confirm/iConfig./confirm/i_ROUTING_KEY+"1", message, correlationData);
log.info("发送消息的内容 | {}", message);
return R.ok("消息发送成功");
}
}
consumer
import com.llc.rabbitmq.config./confirm/iConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class /confirm/iExchangeConsumer {
@RabbitListener(queues = {/confirm/iConfig./confirm/i_QUEUE_NAME})
public void receive/confirm/iQueue(Message message, String body, Channel channel) {
log.info("确认队列消费者 message==>[{}],body==>[{}]", message, body);
}
@RabbitListener(queues = {/confirm/iConfig.BACKUP_QUEUE_NAME})
public void receiveBackupQueue(Message message, String body, Channel channel) {
log.info("备份交换机 《备份》队列消费者 message==>[{}],body==>[{}]", message, body);
}
@RabbitListener(queues = {/confirm/iConfig.WARNING_QUEUE_NAME})
public void receiveWarningQueue(Message message, String body, Channel channel) {
log.info("备份交换机 《警告》队列消费者 message==>[{}],body==>[{}]", message, body);
}
}
优先级队列
config在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如 果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall 商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创 造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存 放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,
所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。
⚠️要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class PriorityMQConfig {
public static final String PRIORITY_EXCHANGE_NAME = "priority.exchange";
public static final String PRIORITY_QUEUE_NAME = "priority.queue";
public static final String PRIORITY_ROUTING_KEY = "priority.routingKey";
@Bean
public DirectExchange priorityExchanger() {
return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE_NAME).durable(true).build();
}
@Bean
public Queue priorityQueue() {
Map arguments = new HashMap<>();
// 官方运行的范围优先级为0-255,此处设置10
arguments.put("x-max-priority", 10);
return QueueBuilder.durable(PRIORITY_QUEUE_NAME).withArguments(arguments).build();
}
@Bean
public Binding priorityQueueBindingExchange(Queue priorityQueue, DirectExchange priorityExchanger) {
return BindingBuilder.bind(priorityQueue).to(priorityExchanger).with(PRIORITY_ROUTING_KEY);
}
}
controller
import com.llc.rabbitmq.common.api.R;
import com.llc.rabbitmq.config.PriorityMQConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Api(tags = "RabbitMQ优先级消息API")
@Slf4j
@RestController
@RequestMapping("/v1/rabbitmq/priority")
public class PriorityController {
private final
RabbitTemplate rabbitTemplate;
public PriorityController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@ApiOperation(value = "向RabbitMQ中发送消息10条消息")
@ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path")
@GetMapping("/send/{message}")
public R sendMessage(@PathVariable String message) {
for (int i = 0; i < 10; i++) {
// 随机数生产
int level = (int) (Math.random() * 10);
log.info("消息==>[{}-{}],优先级==>[{}]", message, i, level);
rabbitTemplate.convertAndSend(
PriorityMQConfig.PRIORITY_EXCHANGE_NAME,
PriorityMQConfig.PRIORITY_ROUTING_KEY,
"优先级消息==>[" + message + "]" + i + "优先级===>[" + level + "]",
mes -> {
mes.getMessageProperties().setPriority(level);
return mes;
});
}
return R.ok("消息发送成功");
}
}
consumer
import com.llc.rabbitmq.config.PriorityMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class PriorityQueueConsumer {
@RabbitListener(queues = {PriorityMQConfig.PRIORITY_QUEUE_NAME})
public void receivePriorityQueue(Message message, String body, Channel channel) {
log.info("《优先级》队列消费者 message==>[{}],body==>[{}]", message, body);
}
}



