栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

工作队列模型

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

工作队列模型

Work queues

Distributing tasks among workers (the competing consumers pattern)

一 轮询模式

当生产者生产能力过高时,需要引入多个消费者来消费消息,默认这些消费者采用轮询的方式消费消息,不管消费者的消费速度如何,消费者消费的消息数量贴近于一致。

生产者发送多条消息

package com.tech.rabbitmq.nospring.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;


public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.50.134");
        factory.setPort(5672);
        factory.setVirtualHost("/dev");
        factory.setUsername("tech");
        factory.setPassword("tech");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //队列名称、是否持久化、是否独占(只能有一个消费者)、是否自动删除(在没有消费者时自动删除)、其他参数
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            for (int i = 0; i < 10; i++) {
                String message = "Hello World! "+i;
                //交换机名称(不写为默认交换机,路由键名称与队列名称一致才能被路由)、路由键名称、配置信息、字节数组
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}
有两个消费者,一个消费者是2秒消费一条消息,一个消费者6秒消费一条消息
package com.tech.rabbitmq.nospring.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class Recv1 {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.50.134");
        factory.setPort(5672);
        factory.setVirtualHost("/dev");
        factory.setUsername("tech");
        factory.setPassword("tech");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //一般是固定的,可以作为会话的名称
//                System.out.println("consumerTag="+consumerTag);
                //可以获取交换机、路由键等信息
//                System.out.println("envelope="+envelope);
//                System.out.println("properties="+properties);
                System.out.println("body="+new String(body,"utf-8"));
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //false 关闭自动确认
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}
package com.tech.rabbitmq.nospring.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class Recv2 {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.50.134");
        factory.setPort(5672);
        factory.setVirtualHost("/dev");
        factory.setUsername("tech");
        factory.setPassword("tech");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(6);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //一般是固定的,可以作为会话的名称
//                System.out.println("consumerTag="+consumerTag);
                //可以获取交换机、路由键等信息
//                System.out.println("envelope="+envelope);
//                System.out.println("properties="+properties);
                System.out.println("body="+new String(body,"utf-8"));
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //false 关闭自动确认
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

启动两个消费者,启动发送者,得到如下结果:

两个消费者消费的消息数一样;
RabbitMQ轮询着给各个消费者逐一发送消息到消费者本地,消费者各自处理本地的消息,由于分配的数量是一样的,消费速度快的消费者会优先消费完进入空闲状态

二 能者多劳模式

        可以通过basicQos设置RabbitMQ每次给消费者发送1条消息,等这条消息手动Ack后才发送下1条消息,这样消费慢的消费者没有ack消息,RabbitMQ不会发送下一条消息给这个消费慢的消费者,而是给到了已经完成消息Ack消费速度快的消费者,实现能者多劳。

接收端需要修改代码

 int fetchCount = 1;
        //设置RabbitMQ给消费者发送fetchCount条消息后,需要等待有消息确认时,再继续给该消费者发送消息,
        // 发送的消息数量为消费者fetchCount-当前未消费者未ack的消息数
        //当fetchCount为1时,则RabbitMQ给消费者发送1条消息,等待这条消息ack后才会发下1条消息
        channel.basicQos(fetchCount);

完整代码如下:

package com.tech.rabbitmq.nospring.work.qos;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class Recv1 {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.50.134");
        factory.setPort(5672);
        factory.setVirtualHost("/dev");
        factory.setUsername("tech");
        factory.setPassword("tech");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        int fetchCount=1;
        //设置RabbitMQ给消费者发送fetchCount条消息后,需要等待有消息确认时,再继续给该消费者发送消息,
        // 发送的消息数量为消费者fetchCount-当前未消费者未ack的消息数
        //当fetchCount为1时,则RabbitMQ给消费者发送1条消息,等待这条消息ack后才会发下1条消息
        channel.basicQos(fetchCount);

        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //一般是固定的,可以作为会话的名称
//                System.out.println("consumerTag="+consumerTag);
                //可以获取交换机、路由键等信息
//                System.out.println("envelope="+envelope);
//                System.out.println("properties="+properties);
                System.out.println("body="+new String(body,"utf-8"));
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //false 关闭自动确认
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}
package com.tech.rabbitmq.nospring.work.qos;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class Recv2 {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.50.134");
        factory.setPort(5672);
        factory.setVirtualHost("/dev");
        factory.setUsername("tech");
        factory.setPassword("tech");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        int fetchCount = 1;
        //设置RabbitMQ给消费者发送fetchCount条消息后,需要等待有消息确认时,再继续给该消费者发送消息,
        // 发送的消息数量为消费者fetchCount-当前未消费者未ack的消息数
        //当fetchCount为1时,则RabbitMQ给消费者发送1条消息,等待这条消息ack后才会发下1条消息
        channel.basicQos(fetchCount);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //一般是固定的,可以作为会话的名称
//                System.out.println("consumerTag="+consumerTag);
                //可以获取交换机、路由键等信息
//                System.out.println("envelope="+envelope);
//                System.out.println("properties="+properties);
                System.out.println("body=" + new String(body, "utf-8"));
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //false 关闭自动确认
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/348995.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号