一、简介二、安装部署
2.1 yum方式安装2.2 docker方式安装 三、RabbitMQ架构
消息投递消费流程(重要): 四、RabbitMQ通讯方式
4.0 建立Maven项目4.1 HelloWord
4.1.1 创建连接工具类4.1.2 生产消息4.1.3 监听消息 4.2 Work Queue
4.2.1 一个生产者4.2.2 两个消费者轮询消费 4.3 Publish/Subscribe(FANOUT类型)
4.3.1 生产者(创建FANOUT类型交换机)4.3.2 两个消费者一起消费 4.4 Routing (DIRECT类型)
4.4.1 生产消息4.4.2 监听消息 4.5 Topic (Topic主题模式)
4.5.1 生产者4.5.2 消费者 4.6 RPC
4.6.1 client客户端4.6.2 server服务端 4.7 Headers 五、RabbitMQ整合SpringBoot
5.1 构建SpringBoot项目5.2 声明交换机、队列、绑定5.3 发送消息5.4 监听消息 六、RabbitMQ保证消息可靠性
6.1 保证消息到达交换机6.2 保证消息路由到队列6.3 保证队列可以持久化消息完整消息生产者代码(确保消息到达队列)总结123实现生产者代码6.4 保证消费者可以正常消费消息6.5 使用SpringBoot实现消息可靠完整SpringBoot确保消息到达队列生产者代码 七、死信队列&延时交换机
7.0 构造死信队列7.1 消息被拒绝或者Nack7.2 消息过期7.3 消息超过队列最大长度7.4 延迟交换机(RabbitMQ Plugins)7.5 声明延时交换机7.6 发送延时消息 八、集群高可用
一、简介本文内容主要包括:
- RabbitMQ的安装部署AMQP架构RabbitMQ的通信方式RabbitMQ整合SpringBootRabbitMQ保证消息可靠性RabbitMQ死信队列和延时交换机RabbitMQ集群高可用
源码已上传Github仓库:https://github.com/Liu-Shihao/rabbitmq-java-api
同步Gitee仓库:https://gitee.com/L1692312138/rabbitmq
- RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,Erlang是一种通用的面向并发的编程语言,所以RabbitMQ的性能特别快。RabbitMQ 支持海量的插件实现一些特殊的功能,例如:延时交换机等。
因为 RabbitMQ 需要 erlang 环境的支持,所以必须先安装 erlang 。
# 安装 erlang 对应的 yum repo curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash # 安装 erlang 环境 yum install erlang-22.3.3-1.el7.x86_64 # 根据提示 再次执行如下命令即可 yum load-transaction /tmp/yum_save_tx.2020-05-14.22-21.n0cwzm.yumtx # 测试 erlang 是否安装成功 erl
# 安装 rabbitmq 对应的 yum repo curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash # 安装 rabbitmq yum install rabbitmq-server-3.8.3-1.el7.noarch
# 设置开机启动 chkconfig rabbitmq-server on # 启动服务 systemctl start rabbitmq-server.service # 开启WEB可视化管理插件 rabbitmq-plugins enable rabbitmq_management # 添加用户 rabbitmqctl add_user 用户名 密码 rabbitmqctl set_user_tags 用户名 administrator # 访问可视化管理界面 浏览器输入: 你的服务器IP:156722.2 docker方式安装
国内 Docker 镜像网站: https://hub.daocloud.io/
需要安装Docker环境。
准备docker-compose.yml文件
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:3.8.5
container_name: rabbitmq
restart: always
volumes:
- ./data/:/var/lib/rabbitmq/
ports:
- 5672:5672
- 15672:15672
# 在Linux内部执行: curl localhost:5672 出现AMQP 安装成功 # 开启可视化界面 # 进入容器内部 docker exec -it rabbitmq bash # 进入 /opt/rabbitmq ,找到 sbin 和 plugins 文件夹 # 在plugins 目录下会有rabbitmq_managemengt插件,然后在哎sbin目录下执行 ./rabbitmq-plugins enable rabbitmq_managemeng # 访问可视化管理界面 浏览器输入: 你的服务器IP:15672 用户名和密码 都是guest
AMQP协议:
RabbitMQ完整架构:
- Publisher (生产者)与 Virtual Host 建立连接 Connection;Publisher(生产者)与 Exchange (交换机)建立通道 Channel;Exchange (交换机)通过 routes (路由) 规则 将消息路由到某一个或者多个Queue(队列)中;Consumer(消费者)与 Virtual Host 建立连接 Connection ;Consumer(消费者)与 Queue(队列)建立通道 Channel;Consumer(消费者)拿到 Queue(队列)的消息进行消费;
官网文档:https://www.rabbitmq.com/getstarted.html
GetStart文档:https://www.rabbitmq.com/getstarted.html
引入RabbitMQ客户端和Junit单元测试的依赖:
4.1 HelloWordcom.rabbitmq amqp-client 5.9.0 junit junit 4.13.1
一个生产者、一个消费者、默认交换机、创建一个队列,路由Key默认为队列名。
使用默认交换机就是"",空字符串 ,默认路由Key就是队列名。
注意:生产者和消费者建议都声明队列
package com.lsh;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQConnectionUtil {
public static final String RABBIT_HOST = "172.16.98.100";
public static final int RABBIT_PORT = 5672;
public static final String RABBIT_USERNAME = "admin";
public static final String RABBIT_PWD = "admin";
public static final String RABBIT_VIRTUAL_HOST = "/";
public static Connection getConnection () throws Exception{
//1. 创建Connection工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2. 设置RabbitMQ的连接信息
connectionFactory.setHost(RABBIT_HOST);
connectionFactory.setPort(RABBIT_PORT);
connectionFactory.setUsername(RABBIT_USERNAME);
connectionFactory.setPassword(RABBIT_PWD);
connectionFactory.setVirtualHost(RABBIT_VIRTUAL_HOST);
//3. 返回连接对象
Connection connection = connectionFactory.newConnection();
return connection;
}
}
4.1.2 生产消息
构造Queue队列参数:
name 队列名
durable 是否持久化(如果我们声明一个持久队列,则为 true(该队列将在服务器重新启动后继续存在))
exclusive 是否声明一个独占队列 只允许一个监听者
autoDelete 如果服务器在不再使用队列时应该删除队列
arguments 用于声明队列的参数
package com.lsh;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class Publisher {
public static final String QUEUE_NAME = "hello";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建Channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.发送消息
String message = "Hello World!";
//默认交换机 "" ; 默认路由为队列名
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送成功!");
//read方法阻塞,查看WEB可视化界面的客户端连接数
System.in.read();
}
}
4.1.3 监听消息
package com.lsh;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class Consumer {
public static final String QUEUE_NAME = "hello";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建Channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.监听队列
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
}
};
//
channel.basicConsume(QUEUE_NAME,true,callback);
System.out.println("开始监听队列");
System.in.read();
}
}
4.2 Work Queue
一个生产者、两个消费者、一个队列、默认交换机、默认路由Key
4.2.1 一个生产者生产者和Hello World的形式是一样的,都是将消息推送到默认交换机。
package com.lsh.work;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class Publisher {
public static final String QUEUE_NAME = "work";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建Channel
Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.发送消息
for (int i = 0; i < 10; i++) {
//默认交换机 "" ; 默认路由为队列名
String message = "Work Queue :"+i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
System.out.println("消息发送成功!");
//read方法阻塞,查看WEB可视化界面的客户端连接数
System.in.read();
}
}
4.2.2 两个消费者轮询消费
让消费者关闭自动ack,手动确认ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息
package com.lsh.b_work;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class TwoConsumer {
public static final String QUEUE_NAME = "work";
@Test
public void consumer01() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建Channel
final Channel channel = connection.createChannel();
//3.构建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.5 设置消息的流控 指定消费者一次拿几个消息
channel.basicQos(1);
//4.监听队列
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者01获取到消息:" + new String(body, "UTF-8"));
//手动确认消息 第二个参数false:是否批量操作
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//关闭消息自动确认
channel.basicConsume(QUEUE_NAME,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
@Test
public void consumer02() throws Exception {
Connection connection = RabbitMQConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者02获取到消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
}
4.3 Publish/Subscribe(FANOUT类型)
创建交换机为FANOUT类型,(发布/订阅)(分裂模式)
一个生产者、一个FANOUT类型交换机,两个队列,两个消费者(消息被送到达两个队列)
交换机和队列直接绑定,不需要routingKey。
package com.lsh.c_pubsub;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class FanOutExchangePublisher {
//交换机名称
public static final String EXCHANGE_NAME = "pubsub";
//队列1
public static final String QUEUE_NAME2 = "subscribe02";
//队列2
public static final String QUEUE_NAME1 = "subscribe01";
@Test
public void publish() throws Exception{
//1.获得链接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建channel
Channel channel = connection.createChannel();
//3.构建交换机 指定交换机类型为FANOUT
// 交换机类型:( DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//4.构建队列(队列名,是否持久,是否排他,是否自动删除,参数)
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
//5.绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定,所以routingKey写和不写都是一样的
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
//6.发送消息到交换机 此处的routingKey没有用
channel.basicPublish(EXCHANGE_NAME,"",null,"Publish/Subscribe".getBytes());
System.out.println("消息发送成功!");
}
}
4.3.2 两个消费者一起消费
package com.lsh.c_pubsub;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class TwoConsumer {
//队列1
public static final String QUEUE_NAME2 = "subscribe02";
//队列2
public static final String QUEUE_NAME1 = "subscribe01";
@Test
public void consumer01() throws Exception {
Connection connection = RabbitMQConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.basicQos(1);
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者01获取到消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME1,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
@Test
public void consumer02() throws Exception {
Connection connection = RabbitMQConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
channel.basicQos(1);
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者02获取到消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME2,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
}
4.4 Routing (DIRECT类型)
创建交换机为DIRECT类型,直接类型
在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到 指定的Queue。
注意:与FANOUT类型交换机不同(FANOUT类型交换机直接绑定队列,不需要routingKey),但是DIRECT类型交换机必须绑定routingKey才能路由到对应的队列。
package com.lsh.d_routing;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class DirectExchangePublisher {
//交换机名称
public static final String EXCHANGE_NAME = "routing";
//队列1
public static final String QUEUE_NAME1 = "routing01";
//队列2
public static final String QUEUE_NAME2 = "routing02";
@Test
public void publish() throws Exception{
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//创建交换机 指定DIRECT直接类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
//绑定交换机和队列并指定路由KEY
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
// 将两个队列和一个交换机绑定
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");
//发送消息到交换机
//此消息会到达队列1
channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"橙子".getBytes());
//此消息会到达队列2
channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"小黑狗".getBytes());
//此消息没有对应的routingKey,所以会被丢弃
channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔".getBytes());
System.out.println("消息发送成功!");
}
}
4.4.2 监听消息
两个消费者同4.3.2一样
package com.lsh.d_direct;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class TwoConsumer {
public static final String QUEUE_NAME1 = "routing01";
public static final String QUEUE_NAME2 = "routing02";
@Test
public void consumer01() throws Exception {
Connection connection = RabbitMQConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.basicQos(1);
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者01获取到消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME1,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
@Test
public void consumer02() throws Exception {
Connection connection = RabbitMQConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
channel.basicQos(1);
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者02获取到消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME2,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
}
4.5 Topic (Topic主题模式)
4.5.1 生产者
创建Topic类型交换机
TOPIC类型可以编写带有特殊意义的routingKey的绑定方式
需要以aaa.bbb.ccc…方式编写routingkey ,
其中有两个特殊字符:*(相当于占位符),#(相当通配符)
package com.lsh.e_topic;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
public class TopicExchangePublisher {
//交换机名称
public static final String EXCHANGE_NAME = "topic";
//队列1
public static final String QUEUE_NAME1 = "TopicQueue01";
//队列2
public static final String QUEUE_NAME2 = "TopicQueue02";
//队列2
public static final String QUEUE_NAME3 = "TopicQueue03";
@Test
public void publish() throws Exception{
// 1、获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.获取通道
Channel channel = connection.createChannel();
//3.创建交换机 指定Topic主题类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//4.创建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
channel.queueDeclare(QUEUE_NAME3,false,false,false,null);
//5.绑定交换机和队列并指定路由KEY
// TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey
// 其中有两个特殊字符:*(相当于占位符),#(相当通配符)
// 一个队列可以绑定多个路由规则
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME3,EXCHANGE_NAME,"lazy.#");
//6.发送消息到交换机
//此路由Key符合"*.orange.*" 和 "*.*.rabbit" ,所以或到达队列1 和2
channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子".getBytes());
//此路由Key符合 "*.*.rabbit" ,所以或到达队列2
channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes());
//此路由Key符合 "lazy.#" ,所以或到达队列3
channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog",null,"懒狗狗".getBytes());
System.out.println("消息发送成功!");
}
}
4.5.2 消费者
消费者代码无更改,略
4.6 RPCRabbitMQ这种RPC模式一般使用的不多。
因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作 需要让Client发送消息时,携带两个属性:
- replyTo告知Server将相应信息放到哪个队列correlationId告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息
package com.lsh.f_rpc;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.UUID;
public class RpcPublisher {
//client端发出消息 队列
public static final String QUEUE_PUBLISHER = "rpc_publisher";
//server端发出响应 队列
public static final String QUEUE_CONSUMER = "rpc_consumer";
@Test
public void publisher()throws Exception{
// 1、获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.获取通道
final Channel channel = connection.createChannel();
//3. 构建队列
channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
//4. 发布消息
String message = "Hello RPC!";
final String uuid = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.replyTo(QUEUE_CONSUMER) //设置监听队列(即Server端的响应消息)
.correlationId(uuid) //设置UUID
.build();
channel.basicPublish("",QUEUE_PUBLISHER,props,message.getBytes());
System.out.println("消息发送成功!");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取唯一标识ID
String id = properties.getCorrelationId();
if (id != null && id.equals(uuid)){
//说明是我们发送的请求消息
System.out.println("接收到服务端响应:"+new String(body, "UTF-8"));
}
//手动ACK确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听响应队列
channel.basicConsume(QUEUE_CONSUMER,false,consumer);
System.in.read();
}
}
4.6.2 server服务端
package com.lsh.f_rpc;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.Date;
public class RpcConsumer {
//client端发出消息 队列
public static final String QUEUE_PUBLISHER = "rpc_publisher";
//server端发出响应 队列
public static final String QUEUE_CONSUMER = "rpc_consumer";
@Test
public void consumer01() throws Exception {
// 1.获得链接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
// 2.获得channel
final Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(QUEUE_PUBLISHER, false, false, false, null);
channel.queueDeclare(QUEUE_CONSUMER, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("服务端获取到消息:" + new String(body, "UTF-8"));
String resp = new Date()+":获取到了client发出的请求,这里是响应的信息";
//获取响应队列
String respQueueName = properties.getReplyTo();
//获取UUID
String uuid = properties.getCorrelationId();
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.correlationId(uuid)
.build();
//将响应信息发送到响应队列
channel.basicPublish("",respQueueName,props,resp.getBytes());
//手动确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_PUBLISHER,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
}
4.7 Headers
Headers类型交换机,(这个用的不多,与Topic类型相似)
public class HeadersPublisher {
public static final String EXCHANGE_NAME = "headers-exchange";
public static final String QUEUE_NAME = "headers-queue";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建Channel
Channel channel = connection.createChannel();
//3.构建Headers类型交换机,创建队列并绑定
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
Map arguments = new HashMap<>();
//x-match 设置为all:表示所有条件都满足才能路由,any:表示有一个条件满足就可以路由
arguments.put("x-match","all");
// arguments.put("x-match","any");
arguments.put("name","jack");
arguments.put("age","23");
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"",arguments);
HashMap headers = new HashMap<>();
headers.put("name","jack");
headers.put("age","23");
//4.发送消息
String msg = "这是Headers类型消息";
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.headers(headers)
.build();
channel.basicPublish(EXCHANGE_NAME,"",properties,msg.getBytes());
System.out.println("消息已发送");
System.in.read();
}
}
五、RabbitMQ整合SpringBoot
5.1 构建SpringBoot项目
org.springframework.boot spring-boot-starter-parent 2.2.2.RELEASE org.springframework.boot spring-boot-starter-test org.springframework.boot spring-boot-starter-amqp
配置application.yml文件:
spring:
rabbitmq:
host: 172.16.98.100
port: 5672
username: admin
password: admin
virtual-host: /
5.2 声明交换机、队列、绑定
在SpringBoot项目中,通过Configuration配置类来声明队列和交换机:
Exchange:在SpringBoot项目中,直接通过ExchangeBuilder来构造交换机Queue:声明队列:在SpringBoot中,通过QueueBuilder.durable(队列名)来构造队列Binding:声明绑定:在SpringBoot项目中,通过BindingBuilder.bind(队列).to(交换机).with(路由Key)构造绑定
package com.lsh.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String TOPIC_EXCHANGE_NAME = "boot-exchange";
public static final String QUEUE_NAME = "boot-queue";
public static final String ROUTING_KEY = "*.black.*";
@Bean
public Exchange exchange(){
// => channel.DeclareExchange
Exchange exchange = ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).build();
return exchange;
}
@Bean
public Queue queue(){
Queue queue = QueueBuilder.durable(QUEUE_NAME).build();
return queue;
}
@Bean
public Binding binding(Exchange exchange,Queue queue){
Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
return binding;
}
}
5.3 发送消息
在SpringBoot项目中通过RabbitTemplate对象调用RabbitMQ API
package com.lsh;
import com.lsh.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SendMQTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void send(){
//交换机、路由Key、消息内容
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,"little.black.rabbit","小黑兔");
System.out.println("消息已发送");
}
@Test
public void sendAndMsgProperties(){
//交换机、路由Key、消息内容、MessageProperties(传递Msg信息:包括CorrelationId、ReplyTo等)
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME, "little.black.rabbit", "小黑兔", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
//设置唯一标识
messageProperties.setCorrelationId("123");
//设置响应队列
// messageProperties.setReplyTo();
return message;
}
});
System.out.println("消息已发送");
}
}
5.4 监听消息
在SpringBoot项目中监听消息,通过@RabbitListener(queues = “队列名”) 注解监听队列
package com.lsh.springboot.listener;
import com.lsh.springboot.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ConsumeListener {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void consumer(String msg, Channel channel, Message message) throws IOException {
System.out.println("队列的消息:"+msg);
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("唯一标识:"+correlationId);
//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
六、RabbitMQ保证消息可靠性
6.1 保证消息到达交换机
/confirm/i机制
可以通过/confirm/i效果保证消息一定送达到Exchange。
//4.开启/confirm/is
channel./confirm/iSelect();
//5.设置/confirm/is的异步回调
channel.add/confirm/iListener(new /confirm/iListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//消息成功发送到交换机 success
System.out.println("消息成功发送到交换机!");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//消息未成功发送到交换机 fail
System.out.println("消息未成功发送到交换机!");
}
});
6.2 保证消息路由到队列
Return机制
通过Return机制保证消息到达队列(注意:只有消息没有成功到达队列时才会触发回调函数)
//6.设置Return回调,确认消息是否到达队列,需要在发送消息时,设置mandatory参数为true开启Return机制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
//只有消息没有到达指定队列时,才会触发此函数(例如RoutingKey不对导致消息没有投递到对应队列,就会触发该回调函数)
System.out.println("消息没有到达指定队列!");
}
});
//8.发送消息 注意:此处需要设置mandatory参数为true,才能开启Return机制
//参数:交换机、路由Key、mandatory、指定参数、消息
channel.basicPublish("","/confirm/is",true,pop,message.getBytes());
6.3 保证队列可以持久化消息
设置deliveryMode为2表示开启消息持久化,MQ重启后消息不会消失
并且队列同样需要设置持久化。
//7.开启消息持久化,如果没有开启消息持久化。如果MQ重启,则消息会丢失
AMQP.BasicProperties pop = new AMQP.BasicProperties()
.builder()
//设置deliveryMode为2表示开启消息持久化,MQ重启后消息不会消失
.deliveryMode(2)
.build();
# 重启RabbitMQ服务 yum 部署方式 systemctl restart rabbitmq-server.service # Docker docker restart rabbitmq
当我们没有开启队列持久化和消息持久化时,如果队列中有消息未消费,重启RabbitMQ服务,则重启后的RabbitMQ服务的队列中原来的消息被丢失。
而当我们开启了队列持久化和消息持久化之后,重启RabbitMQ服务后,消息不会丢失。
package com.lsh.g_/confirm/is;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class /confirm/isPublisher {
public static final String QUEUE_NAME = "/confirm/is";
@Test
public void publisher() throws Exception {
// 1.获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.构建Channel
Channel channel = connection.createChannel();
//3.构建队列 注意此处的durable参数只是控制队列持久化,并不能控制消息持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//4.开启/confirm/is
channel./confirm/iSelect();
//5.设置/confirm/is的异步回调
channel.add/confirm/iListener(new /confirm/iListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//消息成功发送到交换机 success
System.out.println("消息成功发送到交换机!");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//消息未成功发送到交换机 fail
System.out.println("消息未成功发送到交换机!");
}
});
//6.设置Return回调,确认消息是否到达队列,需要在发送消息时,设置mandatory参数为true开启Return机制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
//只有消息没有到达指定队列时,才会触发此函数(例如RoutingKey不对导致消息没有投递到对应队列,就会触发该回调函数)
System.out.println("消息没有到达指定队列!");
}
});
//7.开启消息持久化,如果没有开启消息持久化。如果MQ重启,则消息会丢失
AMQP.BasicProperties pop = new AMQP.BasicProperties()
.builder()
//设置deliveryMode为2表示开启消息持久化,MQ重启后消息不会消失
.deliveryMode(2)
.build();
String message = "Confirms Messaage!";
//8.发送消息 注意:此处需要设置mandatory参数为true,才能开启Return机制
channel.basicPublish("","/confirm/is",true,pop,message.getBytes());
System.out.println("消息发送成功!");
//read方法阻塞,查看WEB可视化界面的客户端连接数
System.in.read();
}
}
6.4 保证消费者可以正常消费消息
通过手动ACK确保业务代码在执行完成后再执行消息确认
package com.lsh.g_/confirm/is;
import com.lsh.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class /confirm/isConsumer {
public static final String QUEUE_NAME = "/confirm/is";
@Test
public void consumer() throws Exception {
//1.获得连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();
//2.获得Channel
final Channel channel = connection.createChannel();
//3.声明队列,持久化队列 durable为true
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//4.设置CallBack函数
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("监听到队列消息:" + new String(body, "UTF-8"));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//5.监听队列,关闭自动ack
channel.basicConsume(QUEUE_NAME,false,callback);
System.out.println("开始监听队列");
System.in.read();
}
}
6.5 使用SpringBoot实现消息可靠
- 在application.yml配置文件中通过配置spring.rabbitmq.publisher-/confirm/i-type为correlated开启/confirm/is机制
spring:
rabbitmq:
# publisher-/confirm/is: true # SpringBoot 2.1版本以下 使用 已弃用
publisher-/confirm/i-type: correlated # 开启/confirm/is确认(2.1版本后)
- 在rabbitTemplate.setConfirmCallback()设置/confirm/is机制的回调函数
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("消息已送达交换机!");
}else {
System.out.println("消息未到达交换机!");
}
}
});
- application.yml配置spring.rabbitmq.publisher-returns为true开启Return机制。
spring:
rabbitmq:
publisher-returns: true # 开启Return机制,确保消息成功路由到队列
- 通过rabbitTemplate.setReturnCallback()方法这是Return机制的回调函数。
注意 :只用SpringBoot项目投递消息时,不需要在设置mandatory参数为true
//注意:低版本使用setReturnCallback()方法;在高版本中该方法被弃用,使用setReturnsCallback()方法
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) throws UnsupportedEncodingException {
String msg = new String(message.getBody());
System.out.println("消息未成功投递到队列:"+msg);
}
});
- 设置消息持久化
//在声明队列的时候直接构造持久化队列
@Bean
public Queue queue(){
//nonDurable表示不持久化 ;durable表示持久化
Queue queue = QueueBuilder.durable(QUEUE_NAME).build();
return queue;
}
//发送消息 设置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
rabbitTemplate.convertAndSend("", "/confirm/iss", "SpringBoot Confirms Message!", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// MessageDeliveryMode枚举类:
// NON_PERSISTENT 表示不持久化 ;PERSISTENT表示持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
完整SpringBoot确保消息到达队列生产者代码
@Test
public void sendWith/confirm/is(){
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("消息已送达交换机!");
}else {
System.out.println("消息未到达交换机!");
}
}
});
//注意:低版本使用setReturnCallback()方法;在高版本中该方法被弃用,使用setReturnsCallback()方法
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String msg = new String(message.getBody());
System.out.println("消息未成功投递到队列:"+msg);
}
});
//注意 :只用SpringBoot项目投递消息时,不需要在设置mandatory参数为true
//发送消息 设置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
rabbitTemplate.convertAndSend("", "/confirm/iss", "SpringBoot Confirms Message!", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// MessageDeliveryMode枚举类:
// NON_PERSISTENT 表示不持久化 ;PERSISTENT表示持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
}
七、死信队列&延时交换机
成为死信有三种方式:
- 消息被拒绝并且禁止重新被投放回队列消息过期(设置消息的TTL或者设置队列的TTL ,两种方式都能使消息过期)队列内消息超过最大队列长度
两个队列(普通队列、死信队列)
两个交换机(普通交换机、死信交换机)
两个路由(普通路由、死信路由)
其实死信队列/交换机/路由和普通的都是一样的,在构造普通队列的时候指定另一个交换机为死信交换机即可。
package com.lsh.springboot.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadLetterConfig {
public static final String NORMAL_EXCHANGE = "normal-exchange";
public static final String NORMAL_QUEUE = "normal-queue";
public static final String NORMAL_ROUTING_KEY = "normal.#";
public static final String DEAD_EXCHANGE = "dead-exchange";
public static final String DEAD_QUEUE = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead.#";
@Bean
public Exchange normalExchange(){
return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
}
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey("dead.abd")
// .ttl(5000) // 设置队列的TTL(消息存活时间)
// .maxLength(1) //设置队列最大长度
.build();
}
@Bean
public Binding normalBinding(Exchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
}
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
}
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
}
7.1 消息被拒绝或者Nack
//1.监听普通队列消息,拒绝或者不ack消息,并且requeue为false禁止重新投递队列,则消息会进入死信队列
@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
public void consumer1(String msg, Channel channel, Message message) throws IOException {
System.out.println("监听到普通队列消息:"+msg);
//拒绝消息或者不ack确认消息
//设置拒绝消息,并禁止重新投递队列requeue=false
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// requeue如果为true则重新排队,如果为false则被丢弃或者进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
7.2 消息过期
设置TTL(TimeToLive)有两种方式:
- 直接设置消息的过期时间(存在问题,如果第一个消息TTL为30秒,第二个消息TTL为3秒,此时第二个消息必须等待第一个消息过期之后才能过期)通过设置队列的过期时间
注意:通过消息过期的方式使消息进入死信队列,消费者不能监听普通队列,需要监听死信队列。
//1.设置消息的TTL
@Test
public void sendDeadLetterQueueAndSetTTL(){
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.ttl", "通过设置TTL消息存活时间,使消息进入死信队列", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息存活时间 String 类型 单位为毫秒
message.getMessageProperties().setExpiration("5000");
return message;
}
});
System.out.println("消息已发送");
}
//2.设置队列的TTL
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey("dead.abd")
.ttl(5000)//设置队列的TTL 单位为毫秒
.maxLength(1)
.build();
}
7.3 消息超过队列最大长度
在声明队列的时候设置队列的最大长度,则超过这个最大长度后的消息都会被进入死信队列或者被丢弃
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey("dead.abd")
.ttl(5000)
.maxLength(1) //设置队列最大长度
.build();
}
7.4 延迟交换机(RabbitMQ Plugins)
使用延时交换机的方式对消息延时是最合适的,因为以上的方式都存在问题:
- 如果给消息设置过期时间,队列消息是按顺序消费的,后面的消息只能等前面那消息处理后才能被处理,如果后面的消息已经过期了但是前面的消息还没有被处理,则后面的消息无法被过期如果给队列设置过期时间,则更不方便,如果消息需要分别延时不同的时间的话,那就只能分别创建多个不同的队列
这种方式是将消息直接存放在队列中等待过期的,时间不准确,并且有弊端。
而直接使用延时交换机来进行延时处理的话,消息是被存放在延时交换机中的,等待到达延时时间后,才会被投递到队列中,从而直接消费。
注意: 由于消息延时在交换机中,未到达队列中,所以如果如果消息设置了Return机制,则由于消息被延时投递,还未到达队列此时会触发Return回调函数,
并且如果此时RabbitMQ服务重启了,存在延时交换机中的消息会被丢失。
延迟交换机属于RabbitMQ的插件了,需要下载插件,开启配置才能实现消息延时
官网插件地址:https://www.rabbitmq.com/community-plugins.html
延时交换机插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
将插件拷贝到/rabbitmq_server-3.8.3/plugins目录下,然后进入sbin目录,执行命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange。
注意:通过yum下载的RabbitMQ的目录在 /usr/lib/
注意版本对应。
此处出现的错误信息unknown exchange type 'x-delayed-message'可以看到,是因为RabbitMQ此时还未启动延时交换机的插件,但是却使用了x-delayed-message的属性。所以导致报错。
1、构造arguments参数 指定交换机类型x-delayed-type为topic
2、指定交换机type为x-delayed-message类型
package com.lsh.springboot.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class DelayedExchangeConfig {
public static final String DELAYED_EXCHANGE_NAME = "boot-delayed-exchange";
public static final String DELAYED_QUEUE_NAME = "boot-delayed-queue";
public static final String DELAYED_ROUTING_KEY = "*.delayed.*";
//普通队列
@Bean
public Queue delayedQueue(){
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
@Bean
public Exchange delayedExchange(){
HashMap arguments = new HashMap<>();
arguments.put("x-delayed-type","topic");
CustomExchange customExchange = new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
return customExchange;
}
@Bean
public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
7.6 发送延时消息
通过message.getMessageProperties().setDelay(30000)设置消息延时时间,单位为毫秒。
//向延时交换机投递延时消息,如果如果消息设置了Return机制,则由于消息被延时投递,还未到达队列此时会触发Return回调函数
@Test
public void sendDelayedExchange(){
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKe)->{
System.out.println("消息未投递到队列");
});
rabbitTemplate.convertAndSend(DelayedExchangeConfig.DELAYED_EXCHANGE_NAME, "little.delayed.rabbit", "小延时兔子", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置延时时间 单位为毫秒
message.getMessageProperties().setDelay(30000);
return message;
}
});
System.out.println("消息已发送");
}
八、集群高可用
TODO



