1.2 应用场景死信就是无法被消费的消息,消费者从队列中取消息时,由于某些特定原因导致消息无法被消费,即没有了后续的处理,就变成了死信继而有了死信队列。
1.3 死信来源可以保证消息不会消失,如果消费者在进行消费时发送异常,可以先放到死信队列中,等后面运行环境好了之后再进行消费。
- 消息TTL过期;队列达到最大长度;消息被拒绝。
架构图:
- 正常情况下使用normal交换机绑定给normal队列C1进行消费;出现异常情况就转发给 dead交换机绑定给dead队列让C2进行消费。
测试过程:
- 开启消费者1构建交换机队列RoutingKey之间的关系;然后关闭消费者1;开启生产者发送消息,等待十秒之后消息过期就会到达死信队列;然后启动消费者2接收死信队列里面的消息。
消费者1:
public class Consumer1 {
// 队列名称
private static final String NORMAL_QUEUE = "normal_queue";
private static final String DEAD_QUEUE = "dead_queue";
// 交换机名称
private static final String NORMAL_EXCHANGE = "e_ch";
private static final String DEAD_EXCHANGE = "d_ch";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 声明队列和交换机类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
Map arguments = new HashMap<>();
// 设置过期时间 10秒 ; 换做在发送方指定。
// arguments.put("x-message-ttl",10000);
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","lisi");
// 创建队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false
,arguments);
// 绑定RoutingKey
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
// 创建死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("C1等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) ->{
System.out.println("C1接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, consumerTag ->{});
}
}
消费者2:
public class Consumer2 {
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
System.out.println("C2等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) ->{
System.out.println("C2接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(DEAD_QUEUE,true, deliverCallback, consumerTag ->{});
}
}
生产者:
public class Producer {
private static final String NORMAL_EXCHANGE = "e_ch";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 声明交换机。
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 设置TTL时间存活时间 10 秒
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 0; i < 11; i++) {
String msg = "message" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功:" + msg);
}
}
}
1.4.2 队列达到最大长度
修改消费者1,指定最多长度为6条消息,超过这6条消息之后的消息会进入到死信中。
消费者1给村参数的map新增一条:
arguments.put("x-max-length",6);
删除生产者中对队列消息事件的限制。
如果出现406错误就是已经存在队列,可以删除原有队列或者修改一个新名字。
修改消费者1,拒绝最后一位为双数的消息。
消费者1:
public class Consumer1 {
// 队列名称
private static final String NORMAL_QUEUE = "normal_queue1";
private static final String DEAD_QUEUE = "dead_queue1";
// 交换机名称
private static final String NORMAL_EXCHANGE = "e_ch1";
private static final String DEAD_EXCHANGE = "d_ch1";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 声明队列和交换机类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
Map arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","lisi");
// 创建队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false
,arguments);
// 绑定RoutingKey
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
// 创建死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("C1等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) ->{
// 拒绝最后为单数的消息
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
if (Integer.parseInt(msg.substring(msg.length()-1)) % 2 == 0){
System.out.println("C1拒绝的消息:" + msg +"----------===========-----------");
// 拒绝消息
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("C1接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
// 开启手动应答
channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag ->{});
}
}
先启动消费者1创建对应关系,随之启动消费者2,再启动生产者。
生产者发送:
消费者1:
消费者2:



