三大功能:流量削峰、应用解耦、异步处理。
erlang语言编写。
1、RabbitMQ基础
(1)MQ四大核心概念
生产者
交换机(1)
队列(n)
消费者
(2)安装
rpm -ivh erlang-21.3-1.el7.x86_64.rpm yum install socat -y rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
(3)启动
// 开机启动 chkconfig rabbitmq-server on // 本次启动 /sbin/service rabbitmq-server start
(5)安装后台
// 先关闭rabbitmq /sbin/service rabbitmq-server stop // 安装插件 rabbitmq-plugins enable rabbit_management // 防火墙状态 systemctl status firewalld // 关闭防火墙 systemctl stop firewalld
(6)添加用户
// 创建账号 rabbitmqctl add_user admin 123 // 设置用户权限 rabbitmqctl set_user_tags admin administrator // 设置用户权限 set_permissions[-p] rabbitmqctl set_permissions -p "/" admin ".*"".*"".*" // 当前用户和角色 rabbitmqctl list_users
2、Hello world模式
消费者
public class Producer {
// 队列名称
public static final String QUEUE_NAME = "hello";
// 发消息
public static void main(String[] args) throws IOException, TimeoutException {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂ip,连接RabbitMQ的队列
factory.setHost("172.18.108.173");
// 用户名和密码
factory.setUsername("briup");
factory.setPassword("briup");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发消息
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功!");
}
}
生产者
public class Consumer {
// 队列名称
public static final String QUEUE_NAME = "hello";
// 接收消息
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.18.108.173");
factory.setUsername("briup");
factory.setPassword("briup");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
3、Work Queues 工作队列
(1)轮询分发消息
一个生产者,多个消费者。
工具类:
public class RabbitUtils {
public static Channel getChannel() throws IOException, TimeoutException {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂ip,连接RabbitMQ的队列
factory.setHost("172.18.108.173");
// 用户名和密码
factory.setUsername("briup");
factory.setPassword("briup");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
com.rabbitmq.client.Channel channel = connection.createChannel();
return channel;
}
}
消费者:
public class Worker01 {
// 队列名称
public static final String QUEUE_NAME = "hello";
// 接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitUtils.getChannel();
// 声明接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
System.out.println("C1等待接收消息");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
生产者:
public class Task01 {
// 队列名称
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 从控制台中接收消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息成功" + message);
}
}
}
(2)、应答机制
①、自动应答:不太可靠
②、手动应答:可以批量应答并且减少网络拥堵
1、Channel.basicAck(用于肯定确认):RabbitMQ已经知道该消息并且成功的处理消息,可以将其丢弃了 2、Channel.basicNack(用于否定确认) 3、Channel.basicReject(用于否定确认):比上面的少了一个批量处理的参数 // true表示批量,false表示不批量(推荐)
消息自动重新入队
mq发现某一个消费者和某一条消息未收到ack确认,就会重新入队,让其他消费者消费。
手动应答
生产者:
public class Task02 {
// 队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("utf-8"));
System.out.println("生产者发送消息" + message);
}
}
}
工具类:
public class SleepUtils {
public static void sleep(int second) {
try {
Thread.sleep(1000*second);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者:
public class Work03 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitUtils.getChannel();
System.out.println("C1等待接收消息时间较短");
// 声明接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
// 沉睡1s
SleepUtils.sleep(1);
System.out.println("接收到的消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
// 采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
public class Work04 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitUtils.getChannel();
System.out.println("C2等待接收消息时间较短");
// 声明接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
// 沉睡1s
SleepUtils.sleep(30);
System.out.println("接收到的消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
// 采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
(3)RabbitMQ持久化
在生产者设置
队列的持久化
消息的持久化
public class Task02 {
// 队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitUtils.getChannel();
// 需要持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
// 要求消息保存到磁盘上
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
System.out.println("生产者发送消息" + message);
}
}
}
(4)不公平分发
消费者设置
// 设置参数,不设置就默认是轮询分发 Channel.basicQos(1);
public class Work03 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitUtils.getChannel();
System.out.println("C1等待接收消息时间较短");
// 声明接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
// 沉睡1s
SleepUtils.sleep(1);
System.out.println("接收到的消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 设置不公平分发
int prefetch = 1;
channel.basicQos(prefetch);
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
// 采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
(5)预取值
不公平分发的各个消费者的分发数量。
public class Work03 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitUtils.getChannel();
System.out.println("C1等待接收消息时间较短");
// 声明接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
// 沉睡1s
SleepUtils.sleep(1);
System.out.println("接收到的消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 设置不公平分发
//int prefetch = 1;
// 预取值
int prefetch = 2;
channel.basicQos(prefetch);
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
// 采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
4、发布确认
设置要求队列必须持久化
设置要求队列中的消息必须持久化
发布确认:mq告诉生产者消息已经保存到磁盘
(1)开启发布确认
Channel channel = RabbitUtils.getChannel(); // 开启发布确认 channel.confirmSelect();
单个确认发布:同步确认发布,发布速度特别慢
批量确认发布:发布出现故障,不知道是哪个出现了问题
异步确认发布:性价比高,可靠性强(回调函数来确认),是否发送成功均有应答
public class ConfirmMessage {
// 批量发消息的个数
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception{
// 1、单个
/confirm/iMessage.publishMessageIndividually(); //722ms
// 2、批量
/confirm/iMessage.publishMessageBatch(); //147ms
// 3、异步
/confirm/iMessage.publishMessageAsync(); //62ms
}
// 1、单个确认
public static void publishMessageIndividually() throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明队列
String queuqName = UUID.randomUUID().toString();
channel.queueDeclare(queuqName, true, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long begin = System.currentTimeMillis();
// 批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queuqName, null, message.getBytes());
boolean flag = channel.waitForConfirms();
// 单个消息马上进行发布确认
if(flag) {
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "单个确认,耗时" + (end-begin) + "ms");
}
// 2、批量确认
public static void publishMessageBatch() throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明队列
String queuqName = UUID.randomUUID().toString();
channel.queueDeclare(queuqName, true, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long begin = System.currentTimeMillis();
// 批量确认消息大小
int batchSize = 100;
// 批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queuqName, null, message.getBytes());
if (i%batchSize == 0) {
// 批量消息发布确认
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "批量确认,耗时" + (end-begin) + "ms");
}
// 3、异步确认
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明队列
String queuqName = UUID.randomUUID().toString();
channel.queueDeclare(queuqName, true, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long begin = System.currentTimeMillis();
// 准备消息的监听器,监听那些消息成功了,哪些失败了
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println("确认的消息:" + deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("未确认的消息:" + deliveryTag);
};
channel.addConfirmListener(ackCallback, nackCallback);
// 批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queuqName, null, message.getBytes());
// 异步发布确认
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "异步确认,耗时" + (end-begin) + "ms");
}
}
如何处理异步未处理消息:把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentlinkQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。
1、在发送部分记录下所有要发送的消息
2、在消息确认成功部分删除已经发送的消息
3、在消息确认失败部分打印未确认的消息
// 3、异步确认
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明队列
String queuqName = UUID.randomUUID().toString();
channel.queueDeclare(queuqName, true, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long begin = System.currentTimeMillis();
ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();
// 准备消息的监听器,监听那些消息成功了,哪些失败了
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
// ***********2、在消息确认成功部分删除已经发送的消息***********
ConcurrentNavigableMap confirmed =
outstanding/confirm/is.headMap(deliveryTag);
} else {
outstanding/confirm/is.remove(deliveryTag);
}
System.out.println("确认的消息:" + deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
// ***********3、在消息确认失败部分打印未确认的消息***********
String message = outstanding/confirm/is.get(deliveryTag);
System.out.println("未确认的消息" + message + "未确认的消息的标记:" + deliveryTag);
};
channel.addConfirmListener(ackCallback, nackCallback);
// 批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queuqName, null, message.getBytes());
// 异步发布确认
// ***********1、在发送部分记录下所有要发送的消息***********
outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "异步确认,耗时" + (end-begin) + "ms");
}
(2)三种发布确认比较
单个:简单,但吞吐量非常有限;
批量:简单,合理的吞吐量,一旦出现问题就很难推断出是那条消息出现了问题;
异步:最佳性能和资源使用,出现问题能很好控制,实现较难。
5、交换机
RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上生产者只能将消息发送到交换机。
交换机:①接收来自生产者的消息;②将消息推入队列(routingKey / bindingKey)。
(1)交换机类型
直接/路由(direct)
主题(topic)
标题(headers)
扇出(fanout)
无名(默认)
(2)临时队列
features是否为D:一旦断开消费者的连接,队列将被自动删除。
// 创建方式 String queueName = channel.queueDeclare().getQueue();
(3)绑定(binding)
交换和队列直接的绑定。
(4)扇出
两个RoutingKey相同。
将接收到的所有消息广播到它知道的所有队列中。
消费者
public class ReceiveLog01 {
// 交换机的名称
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue, EXCHANGE_NAME, "");
System.out.println("等待接收消息。。。。。。");
// 接收消息
DeliverCallback deliverCallback = (customerTag, message) -> {
System.out.println("ReceiveLog01打印接收的消息:" + message);
};
channel.basicConsume(queue, true, deliverCallback, consumerTage -> {});
}
}
生产者:
public class EmitLog {
// 交换机的名称
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
(5)直接/路由
RoutingKey不同
消费者
public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明临时队列
channel.queueDeclare("console", false,false,false,null);
// 绑定交换机和队列
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");
System.out.println("等待接收消息。。。。。。");
// 接收消息
DeliverCallback deliverCallback = (customerTag, message) -> {
System.out.println("ReceiveLogsDirect01打印接收的消息:" + message);
};
channel.basicConsume("console", true, deliverCallback, consumerTage -> {});
}
}
生产者
public class DirectLogs {
// 交换机的名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("utf-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
(6)topic
routing_key:单词列表,以点号隔开
*(星号)可以代替一个单词
#(井号)可以代替0个或者多个
注意:
当一个队列绑定键是#,那么这个队列接收所有数据 ,(fanout);
如果队列绑定键当中,没有#和*出现,那么该队列绑定类型就是direct。
消费者:
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明临时队列
String queue = "Q1";
channel.queueDeclare(queue, false,false,false,null);
// 绑定交换机和队列
channel.queueBind(queue, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息。。。。。。");
// 接收消息
DeliverCallback deliverCallback = (customerTag, message) -> {
System.out.println("ReceiveLogsTopic01打印接收的消息:" + message.getBody());
System.out.println("接收队列" + queue + "绑定键" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queue, true, deliverCallback, consumerTage -> {});
}
}
生产者:
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
Map bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "Q1和Q2接收到");
bindingKeyMap.put("lazy.orange.elephant", "Q1和Q2接收到");
bindingKeyMap.put("quick.orange.fox", "Q1接收到");
bindingKeyMap.put("lazy.brown.fox", "Q2接收收到");
bindingKeyMap.put("lazy.pink.rabbit", "Q2接收到一次");
bindingKeyMap.put("quick.brown.fox", "丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "Q2接收到");
for (Map.Entry bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
System.out.println("生产者发送消息:" + message);
}
}
}
6、死信队列
(1)死信概念
无法被消费的消息。
(2)死信来源
消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq中)
消息被拒绝(basic.reject或basic.nack)并且requence=false
(3)代码体现
消息TTL过期
消费者1:
public class Consumer01 {
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通队列
Map arguments = new HashMap<>();
// 过期时间
// arguments.put("x-message-ttl", 10000);
// 正常队列设置死信队列的交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
// 死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer01接收到:" + new String(message.getBody(), "utf-8"));
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, sonsumerTag -> {});
}
}
生产者:
public class Producer {
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitUtils.getChannel();
// 死信消息,设置ttl时间, time to live
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
}
}
}
消费者2:
public class Consumer02 {
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer02接收到:" + new String(message.getBody(), "utf-8"));
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {});
}
}
队列达到最大长度
生产者:
public class Producer {
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitUtils.getChannel();
// 死信消息,设置ttl时间, time to live
// AMQP.BasicProperties properties = new AMQP.BasicProperties()
// .builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
}
}
}
消费者1:
public class Consumer01 {
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通队列
Map arguments = new HashMap<>();
// 过期时间
// arguments.put("x-message-ttl", 10000);
// 正常队列设置死信队列的交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-letter-routing-key", "lisi");
// 设置正常队列的长度限制
arguments.put("x-max-length", 6);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
// 死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer01接收到:" + new String(message.getBody(), "utf-8"));
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {});
}
}
消息被拒
消费者1:
public class Consumer01 {
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 普通队列
Map arguments = new HashMap<>();
// 过期时间
// arguments.put("x-message-ttl", 10000);
// 正常队列设置死信队列的交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-letter-routing-key", "lisi");
// 设置正常队列的长度限制
// arguments.put("x-max-length", 6);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
// 死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "utf-8");
if (msg.equals("info5")) {
System.out.println(msg + "被拒收");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer01接收到:" + message);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});
}
}
7、延迟队列
(1)整合SpringBoot
配置类代码
@Configuration
public class TtlQueueConfig {
// 普通交换机
private static final String X_EXCHANGE = "X";
// 死信交换机
private static final String Y_DEAD_EXCHANGE = "Y";
// 普通队列
private static final String QUEUE_A = "QA";
private static final String QUEUE_B = "QB";
// 死信队列
private static final String DEAD_QUEUE_D = "QD";
// 声明普通交换机
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明死信交换机
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_EXCHANGE);
}
// 声明普通队列,过期时间10秒
@Bean("queueA")
public Queue queueA() {
Map arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-letter-routing-key", "YD");
// 设置ttl
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
@Bean("queueB")
public Queue queueB() {
Map arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-letter-routing-key", "YD");
// 设置ttl
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
// 声明死信队列
@Bean("deadQueueD")
public Queue deadQueueD() {
return QueueBuilder.durable(DEAD_QUEUE_D).build();
}
// 绑定rountingKey
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("deadQueueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生产者
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 开始发消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条消息给两个TTL队列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XA", "来自ttl为10s的消息:"+message);
rabbitTemplate.convertAndSend("X", "XB", "来自ttl为40s的消息:"+message);
}
}
消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {
// 接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
log.info("当前时间:{}, 收到的死信队列的消息:{}", new Date().toString(), msg);
}
}
(2)延迟队列优化
死信队列,再添加一个队列不设置延迟时间。
private static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueC() {
Map arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
生产者:
// 开始发消息, TTL
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
log.info("当前时间:{},发送一条时长{}ms的ttl消息给队列QC:{}", new Date().toString(), ttlTime, message);
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
在消息属性上设置ttl:RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长很长,第二个消息的延时时长很短,第二个消息并不会优先得到执行。
(3)RabbitMQ插件实现延迟队列
配置:
public class DelayQueueConfig {
// 交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
// 队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
// routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 声明交换机
@Bean("delayExchange")
public CustomExchange delayExchange() {
Map arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
// 声明队列
@Bean("delayedQueue")
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayExchange") CustomExchange delayExchange
) {
return BindingBuilder.bind(delayedQueue).to(delayExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者:
// 开始发消息, TTL
@GetMapping("sendExpirationMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
log.info("当前时间:{},发送一条时长{}ms的消息给延迟队列delayed.queue:{}", new Date().toString(), delayTime, message);
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME, DelayQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
消费者:
@Component
@Slf4j
public class DelayQueueConsumer {
// 监听消息
@RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message) throws Exception{
String msg = new String(message.getBody());
log.info("当前时间:{}, 收到的死信队列的消息:{}", new Date().toString(), msg);
}
}
8、发布确认高级
配置:
public class ConfirmConfig {
// 交换机
public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i_exchange";
// 队列
public static final String /confirm/i_QUEUE_NAME = "/confirm/i_queue";
// routingKey
public static final String /confirm/i_ROUTING_KEY = "/confirm/i_routing_key";
// 声明交换机
@Bean("/confirm/iExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(/confirm/i_EXCHANGE_NAME);
}
// 声明队列
@Bean("/confirm/iQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
}
// 绑定
@Bean
public Binding queueBindingExchange(@Qualifier("/confirm/iQueue") Queue queue,
@Qualifier("/confirm/iExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(/confirm/i_ROUTING_KEY);
}
}
生产者:
@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
rabbitTemplate.convertAndSend(/confirm/iConfig./confirm/i_EXCHANGE_NAME, /confirm/iConfig./confirm/i_ROUTING_KEY, message);
log.info("发送消息内容:{}", message);
}
}
消费者:
@Component
@Slf4j
public class Consumer {
@RabbitListener(queues = /confirm/iConfig./confirm/i_QUEUE_NAME)
public void receiveConfirmMessage(Message message) {
String msg = new String(message.getBody());
log.info("接收到的队列/confirm/i.queue消息:{}", msg);
}
}
(1)回调接口
注意:必须通过@PostConstruct注解注入接口
(2)交换机确认
配置文件需要添加:开启生产者确认机制
spring.rabbitmq.publisher-/confirm/i-type=correlated
RabbitTemplate./confirm/iCallback
交换机确认回调方法,confirm()
1、发消息,交换机接收到了,回调
correlationData,保存回调消息的ID及相关信息
交换机收到消息,ack=true
cause null
2、发消息,交换机接收失败了,回调
correlationData,保存回调消息的ID及相关信息
交换机收到消息,ack=false
cause,失败的原因
(3)回退消息
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
配置文件需要添加:开启回退消息给生产者
spring.rabbitmq.publisher-returns=true
接口:RabbitTemplate.ReturnsCallback
方法:returnedMessage(),在消息在传递过程中不可达目的地时将消息返回给生产者。
(4)备份交换机
添加备份交换机、备份队列、报警队列。
9、RabbitMQ其他知识点
(1)幂等性
消息被重复消费
设置全局id(UUID、时间戳)。
唯一ID+指纹吗机制:时间戳——唯一信息码;
redis的原子性,setnx命令(推荐)。
(2)优先级队列
订单催付:订单优先级(0-255,数字越大越优先执行)
①、队列中添加优先级
②、消息中添加优先级
③、消费者需要等待消息先发送到队列才去消费
(3)惰性队列
消息保存在内存中还是在磁盘中。
正常情况下,消息保存在内存中,消费速度快;
惰性队列中:消息保存在磁盘中,消费速度慢,适用于消费者下线,宕机时。
两种模式:default和lazy。
惰性队列消耗内存小。
10、RabbitMQ集群
(1)备份镜像
添加策略可以备份镜像,防止数据丢失。
参数:指定模式。
就算整个集群只剩下一台机器了,依然能消费队列里面的消息,说明队列里面的消息被镜像队列传递到相应的机器里面去了。
(2)Haproxy+Keepalive实现高可用
Haproxy+Keepalive实现高可用:建立备机,当主机宕机后,地址就会漂移到备机上,备机也会问主机的在线状态。
高可用:主机宕机备机可以接管主机的工作。
(3)Federation Exchange(联合交换机)
安装federation插件:rabbitmq-plugins enable rabbitmq_federation_management
FederationExchange原理
以交换机为节点,先准备上下游两个交换机节点,在上游交换机配置连接下游交换机。
FederationExchange实现
准备交换机和队列,设置上游,设置规则,添加上游策略。
FederationQueue实现
联邦队列可以将不同地区的数据进行同步。
(4)Shovel
Shovel:数据备份,将源端的数据转发给目的端,负责连接源和目的地,负责消息的读写和负责连接失败的处理。
安装shovel插件:rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_managment



