栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Rabbitmq消息应答

Rabbitmq消息应答

消息应答

RabbitMq默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,

生产者
public class Task2 {


    //队列名称
    public static final String QUEUE_NAME = "ack_queue";


    public static void main(String[] args) throws Exception{
        //声明队列
        Channel channel = RabbitMqUtils.getChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //从控制台中输入信息
        Scanner in = new Scanner(System.in);
        while(in.hasNext()){
            String message = in.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("生产者发出消息:消息是"+message);

        }

    }





}

消费者C1
public class Work03 {

    //队列名称
    public static final String QUEUE_NAME = "ack_queue";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间短");

        //消息的接收
        DeliverCallback deliverCallback = (consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            //睡眠1秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接收到的消息:"+new String(message));
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };

        //采用手动应答
        boolean autoAck = false;

        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        });



    }
}

消费者C2
public class Work04 {

    //队列名称
    public static final String QUEUE_NAME = "ack_queue";


    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间长");

        //消息的接收
        DeliverCallback deliverCallback = (consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            //睡眠30秒
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接收到的消息:"+new String(message));
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };

        //采用手动应答
        boolean autoAck = false;

        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        });



    }
}

在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/613039.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号