目录
- 一、依赖
- 二、代码
- 1、死信队列
- 1.1、TTL过期
- 1.1.1、生产者
- 1.1.2、普通消费者
- 1.1.3、死信消费者
- 1.2、队列长度有限
- 1.2.1、生产者
- 1.2.2、普通消费者
- 1.2.3、普通消费者
- 1.3、消费者拒绝
- 1.3.1、生产者
- 1.3.2、普通消费者
- 1.3.3、死信消费者
- 2、优先级队列
-
- 3、自定义延迟交换机
-
- 4、备份交换机
- 4.1、生产者
- 4.2、普通消费者
- 4.3、备份消费者
一、依赖
com.rabbitmq
amqp-client
5.14.2
二、代码
1、死信队列
1.1、TTL过期
1.1.1、生产者
public class Provider {
// 队列名称
private static final String COMMON_EXCHANGE_NAME = "common_exchange";
private static final String COMMON_QUEUE_NAME = "common_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
) {
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
AMQP.BasicProperties properties = new AMQP.BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null).builder().expiration("10000").build();
for (int i = 1; i <= 10; i++) {
// 消息内容
String message = "Hello World!" + i;
channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.1.2、普通消费者
public class Consumer1 {
// 队列名称
private static final String COMMON_EXCHANGE_NAME = "common_exchange";
private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
private static final String COMMON_QUEUE_NAME = "common_queue";
private static final String DEAD_QUEUE_NAME = "dead_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 创建普通交换机
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 为死信指明死信交换机和死信路由
Map arguments = new HashMap<>();
// 设置死信的目的交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 设置死信携带的路由
arguments.put("x-dead-letter-routing-key", DEAD_QUEUE_NAME);
channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
// 绑定普通队列和普通交换机
channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
// 绑定死信队列和死信交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(str);
};
channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.1.3、死信消费者
public class Consumer2 {
// 队列名称
private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
private static final String DEAD_QUEUE_NAME = "dead_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 创建死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建死信队列
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
// 绑定死信队列和死信交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(str);
};
channel.basicConsume(DEAD_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.2、队列长度有限
1.2.1、生产者
public class Provider {
// 队列名称
private static final String COMMON_EXCHANGE_NAME = "common_exchange";
private static final String COMMON_QUEUE_NAME = "common_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
) {
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
for (int i = 1; i <= 10; i++) {
// 消息内容
String message = "Hello World!" + i;
channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.2.2、普通消费者
public class Consumer1 {
// 队列名称
private static final String COMMON_EXCHANGE_NAME = "common_exchange";
private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
private static final String COMMON_QUEUE_NAME = "common_queue";
private static final String DEAD_QUEUE_NAME = "dead_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 创建普通交换机
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 为死信指明死信交换机和死信路由
Map arguments = new HashMap<>();
// 设置普通队列最大长度,也就是队列同时最多能存在的消息数量
arguments.put("x-max-length", 6);
// 设置死信的目的交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 设置死信携带的路由
arguments.put("x-dead-letter-routing-key", DEAD_QUEUE_NAME);
channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
// 绑定普通队列和普通交换机
channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
// 绑定死信队列和死信交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(str);
};
channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.2.3、普通消费者
public class Consumer2 {
// 队列名称
private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
private static final String DEAD_QUEUE_NAME = "dead_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 创建死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建死信队列
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
// 绑定死信队列和死信交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(str);
};
channel.basicConsume(DEAD_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.3、消费者拒绝
1.3.1、生产者
public class Provider {
// 队列名称
private static final String COMMON_EXCHANGE_NAME = "common_exchange";
private static final String COMMON_QUEUE_NAME = "common_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
) {
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
for (int i = 1; i <= 10; i++) {
// 消息内容
String message = "Hello World!" + i;
channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.3.2、普通消费者
public class Consumer1 {
// 队列名称
private static final String COMMON_EXCHANGE_NAME = "common_exchange";
private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
private static final String COMMON_QUEUE_NAME = "common_queue";
private static final String DEAD_QUEUE_NAME = "dead_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 创建普通交换机
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 为死信指明死信交换机和死信路由
Map arguments = new HashMap<>();
// 设置死信的目的交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 设置死信携带的路由
arguments.put("x-dead-letter-routing-key", DEAD_QUEUE_NAME);
channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
// 绑定普通队列和普通交换机
channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
// 绑定死信队列和死信交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(COMMON_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.3.3、死信消费者
public class Consumer2 {
// 队列名称
private static final String DEAD_EXCHANGE_NAME = "dead_exchange";
private static final String DEAD_QUEUE_NAME = "dead_queue";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 不使用try-with-resource语法的原因是“我们希望进程在消费者异步监听消息到达时始终保持活动状态”,不希望try中代码运行结束,就关闭Connection和Channel
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 创建死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建死信队列
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
// 绑定死信队列和死信交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(str);
};
channel.basicConsume(DEAD_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、优先级队列
2.1、生产者
public class Provider {
// 普通交换机
public static final String COMMON_EXCHANGE_NAME = "common_exchange3";
// 普通队列
public static final String COMMON_QUEUE_NAME = "common_queue3";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
) {
// 声明交换机
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 消息发送
for (int i = 1; i <= 10; i++) {
String message = "测试消息" + i;
if (i == 5) {
// 默认优先级都是0,设置5号消息的优先级是5
AMQP.BasicProperties properties = new AMQP.BasicProperties("text/plain",
null,
null,
2,
5, null, null, null,
null, null, null, null,
null, null).builder().build();
channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
} else {
channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.2、消费者
public class Consumer {
// 普通交换机
public static final String COMMON_EXCHANGE_NAME = "common_exchange3";
// 普通队列
public static final String COMMON_QUEUE_NAME = "common_queue3";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建队列
Map arguments = new HashMap<>(1);
arguments.put("x-max-priority", 10);
channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, arguments);
// 绑定普通队列和普通交换机
channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(str);
};
channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、自定义延迟交换机
3.1、生产者
public class Provider {
// 延时交换机
public static final String DELAY_EXCHANGE_NAME = "delay_exchange2";
// 普通队列
public static final String COMMON_QUEUE_NAME = "common_queue2";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
) {
// 创建延迟交换机;注意交换机类型是"x-delayed-message",另外该交换机本质上还是一个direct类型交换机
Map arguments = new HashMap<>(1);
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
// 10s消息;说明:10s消息应该比2s的消息更晚到达
String message = "我是10s的消息";
HashMap header = new HashMap<>();
// 设置延迟10s:这个延迟操作是在交换机中进行了,10s之后会将消息发送给消息队列,当然本次使用的交换机是延迟交换机
header.put("x-delay", 10000);
AMQP.BasicProperties properties = new AMQP.BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null).builder().headers(header).build();
channel.basicPublish(DELAY_EXCHANGE_NAME, COMMON_QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
// 2s消息;说明:2s消息应该比10s的消息更早到达
message = "我是2s的消息";
header = new HashMap<>();
header.put("x-delay", 2000);
properties = new AMQP.BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null).builder().headers(header).build();
channel.basicPublish(DELAY_EXCHANGE_NAME, COMMON_QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.2、消费者
public class Consumer {
// 延时交换机
public static final String DELAY_EXCHANGE_NAME = "delay_exchange2";
// 普通队列
public static final String COMMON_QUEUE_NAME = "common_queue2";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 创建延迟交换机
Map arguments = new HashMap<>(1);
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
// 创建队列
channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, null);
// 绑定普通队列和延迟交换机
channel.queueBind(COMMON_QUEUE_NAME, DELAY_EXCHANGE_NAME, COMMON_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(str);
};
channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
4、备份交换机
4.1、生产者
public class Provider {
// 普通交换机
public static final String COMMON_EXCHANGE_NAME = "common_exchange5";
// 备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange5";
// 普通队列
public static final String COMMON_QUEUE_NAME = "common_queue5";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
) {
// 声明普通交换机
Map arguments = new HashMap<>();
arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, arguments);
// 消息发送
String message = "测试消息1";
channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
// 消息发送(说明:我们故意将路由键写错,让交换机无法传递消息到普通队列,然后消息被被传递到了备份交换机中,然后我们的备份队列就可以收到消息了)
message = "测试消息2";
channel.basicPublish(COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME + "1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.2、普通消费者
public class Consumer1 {
// 普通交换机
public static final String COMMON_EXCHANGE_NAME = "common_exchange5";
// 备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange5";
// 普通队列
public static final String COMMON_QUEUE_NAME = "common_queue5";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 声明普通交换机
Map arguments = new HashMap<>();
arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
channel.exchangeDeclare(COMMON_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, arguments);
// 声明备份交换机
channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 创建队列
channel.queueDeclare(COMMON_QUEUE_NAME, true, false, false, null);
// 绑定死信队列和死信交换机
channel.queueBind(COMMON_QUEUE_NAME, COMMON_EXCHANGE_NAME, COMMON_QUEUE_NAME);
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new java.lang.String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("普通队列接收到的消息:" + str);
};
channel.basicConsume(COMMON_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.3、备份消费者
public class Consumer2 {
// 普通交换机
public static final String COMMON_EXCHANGE_NAME = "common_exchange5";
// 备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange5";
// 普通队列
public static final String COMMON_QUEUE_NAME = "common_queue5";
// 备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue5";
public static void main(String[] args) {
// 创建连接工厂,配置连接相关设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
// 根据上面设置的参数来创建连接
Connection connection = factory.newConnection();
// 创建通道;一个连接里面可以创建多个通道,而通道用于消息发送和接收,这样也可以减少资源浪费
Channel channel = connection.createChannel();
// 声明备份交换机
channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 创建队列
channel.queueDeclare(BACKUP_QUEUE_NAME, true, false, false, null);
// 绑定死信队列和死信交换机
channel.queueBind(BACKUP_QUEUE_NAME, BACKUP_EXCHANGE_NAME, "");
// 消息创建中的回调对象;用来处理从队列异步推送过来的消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("备份队列接收到的消息:" + str);
};
channel.basicConsume(BACKUP_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}