死信队列原理图:
Producer.java
package com.zheng.eight;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.zheng.utils.RabbitMqUtil;
public class Producer {
public static final String NORMAL_EXCHANGE= "normal_exchange";
//普通交换机的名称
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//死信消息 设置ttl时间
for (int i = 0; i < 11; i++) {
String message = "info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
Consumer1.java
package com.zheng.eight;
import com.rabbitmq.client.*;
import com.zheng.utils.RabbitMqUtil;
import java.util.HashMap;
import java.util.Map;
//死信
//消费者1
public class Consumer01 {
//普通的交换机
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列的名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列的名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明死信和普通交换机 类型为direct
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
Map arguments = new HashMap<>() ;
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信Routingkey
arguments.put("x-dead-letter-routing-key","lisi");
//过期时间 毫秒
arguments.put("x-message-ttl",100000);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,NORMAL_EXCHANGE,"lisi");
channel.queueDeclare();
DeliverCallback deliverCallback= (consumerTag,message)->{
System.out.println("Consumer01接收的消息是"+new String(message.getBody(),"utf-8"));
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("未确认消息");
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback,null);
}
}
Consumer2.java
package com.zheng.eight;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zheng.utils.RabbitMqUtil;
public class Consumer02 {
//消费者2
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
System.out.println("等待接收消息");
DeliverCallback deliverCallback=(consumerTag,message)->{
System.out.println("Consumer02接收的消息是:"+new String(message.getBody(),"utf-8"
));
};
CancelCallback cancelCallback = (consumerTag)->{
};
channel. basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback,null);
}
}
要想懂代码,必须先看懂原理图



