栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 消息持久化 消费者手动ACK

RabbitMQ 消息持久化 消费者手动ACK

生产者

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("39.106.2");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
     
        Channel channel = connection.createChannel();
        channel.queueDeclare("test", true, false, false, null);//Server端的Queue持久化
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.contentEncoding("UTF-8");   //编码集
        builder.deliveryMode(2); //送消息设置发送模式 持久化模式
        AMQP.BasicProperties build = builder.build();
    
        for(int i=0; i < 10; i++){
            JSonObject jo = new JSONObject();
            jo.put("id",i+"");
            jo.put("name","刘"+i);
            jo.put("old",i+20+"");
            //推消息
            channel.basicPublish("", "test001", build, jo.toString().getBytes());
        }
        channel.close();
        connection.close();


消费端,模拟报错,不报错情况下手动ACK,报错不ACK

//1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("39.106.2");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();

        //4 声明(创建)一个队列
        String queueName = "test";
//        参数:队列名称、持久化与否、独占与否、无消息队列是否自动删除、消息参数
        channel.queueDeclare(queueName, true, false, false, null);

        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6 设置Channel
        channel.basicConsume(queueName, false, queueingConsumer);
        while(true){
            //7 获取消息(Delivery:传送)
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            JSonObject jsonObject = new JSONObject(msg);
            String id = jsonObject.getString("id");
            String name = jsonObject.getString("name");
            String old = jsonObject.getString("old");
            Boolean isTrue = true;
            try {
                //模拟报错
                if(new Date().getTime() % 2 ==0){
                    int i = 1/0;
                }
            }catch (Exception e){
                isTrue = false;
                System.out.println("报错");
            }
            if(isTrue) {//不报错 输出控制台拿到的消息,手动ACK
                System.err.println("消费端: id=" + id + " name=" + name + " old=" + old);

                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }

消费了 3个,7个报错

查看web的RabbitMQ控制台,还有7个消息属于未ACK消息

切断消费者连接,7个未ACK消息回到Ready

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600333.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号