RabbitMq默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,
生产者
public class Task2 {
//队列名称
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
//声明队列
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//从控制台中输入信息
Scanner in = new Scanner(System.in);
while(in.hasNext()){
String message = in.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("生产者发出消息:消息是"+message);
}
}
}
消费者C1
public class Work03 {
//队列名称
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息处理时间短");
//消息的接收
DeliverCallback deliverCallback = (consumerTag, delivery)->{
String message= new String(delivery.getBody());
//睡眠1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到的消息:"+new String(message));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
});
}
}
消费者C2
public class Work04 {
//队列名称
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息处理时间长");
//消息的接收
DeliverCallback deliverCallback = (consumerTag, delivery)->{
String message= new String(delivery.getBody());
//睡眠30秒
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到的消息:"+new String(message));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
});
}
}
在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了



