栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ的相关操作3--预取值分发(不公平分发)

RabbitMQ的相关操作3--预取值分发(不公平分发)

目录
  • 1.预备操作
  • 2.创建工具类,让线程停顿,用来控制程序运行速度
  • 3. 编写生产者代码
  • 4. 编写消费者代码
    • 4.1 编写处理时间较短的消费者代码
    • 4.2 编写处理时间较长的消费者代码
  • 5. 运行测试代码

1.预备操作

与 RabbitMQ的相关操作2–轮训分发消息中前两步一致,引入相关依赖,并编写创建信道的工具类代码

2.创建工具类,让线程停顿,用来控制程序运行速度
public class SleepUtils {
    public static void sleep(int second){
        try {
            Thread.sleep(1000*second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
3. 编写生产者代码
public class Task02 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列持久化
        boolean durable = true;
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
        Scanner sc = new Scanner(System.in);
        while(sc.hasNext()){
            String message = sc.next();
            //消息持久化,保存到磁盘中  MessageProperties.PERSISTENT_TEXT_PLAIN
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            System.out.println("生产者发出消息:" + message);
        }
    }
}
4. 编写消费者代码 4.1 编写处理时间较短的消费者代码
public class Worker03 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间较短");
        //消息的接收
        DeliverCallback deliverCallback = (consumerTag, message) ->{
            SleepUtils.sleep(1);
            System.out.println("接收到的消息:"+new String(message.getBody()));
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        //设置不公平分发   等于0的时候为公平
        int prefetchCount = 1;
        //设置预取值为2 可以积压2条数据
//        int prefetchCount = 2;
        channel.basicQos(prefetchCount);

        //消息接收被取消时执行的内容
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息者取消消费接口回调逻辑");
        };
        //采用手动
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
4.2 编写处理时间较长的消费者代码
public class Worker04 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间较长");
        //消息的接收
        DeliverCallback deliverCallback = (consumerTag, message) ->{
            SleepUtils.sleep(30);
            System.out.println("接收到的消息:"+new String(message.getBody()));
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        //设置不公平分发
        int prefetchCount = 1;
        //设置预取值为5 可以积压5条数据
//        int prefetchCount = 5;
        channel.basicQos(prefetchCount);

        //消息接收被取消时执行的内容
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息者取消消费接口回调逻辑");
        };
        //采用手动
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
5. 运行测试代码

分别运行两个消费者代码

运行生产者代码,并手动输入要发送的消息

查看消费者代码


可以看到设置了预取值之后,变为了不公平分发。当prefetchCount=0时,为公平分发(默认)。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612665.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号