栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

听说你用过RabbitMQ?那来说说RabbitMQ的几种工作模式吧......

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

听说你用过RabbitMQ?那来说说RabbitMQ的几种工作模式吧......

前言

本文我们来学习下RabbitMQ的几种工作模式,通过具体的demo实战来体会下RabbitMQ的美妙之处。

工作队列模式 简介

工作队列模式(work queue):生产者发送消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。

代码实战

新建maven工程,添加依赖:

    
            com.rabbitmq
            amqp-client
            5.9.0
        

生产者核心代码如下:

public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
       // ...省略配置uri的代码factory.setUri("amqp://root:123456@...:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

//        声明一个消息队列
        channel.queueDeclare("queue.wq", true, false, false, null);
//        声明direct交换器
        channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
//       将消息队列绑定到指定的交换器,并指定绑定键
        channel.queueBind("queue.wq", "ex.wq", "key.wq");

        for (int i = 0; i < 15; i++) {
            channel.basicPublish("ex.wq",
                    "key.wq", null,
                    ("工作队列:" + i).getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}

在生产者服务中,我们设置了一个for循环,往消息队列里发布15次消息。

消费者核心代码:

public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
       //...省略配置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare("queue.wq", true,
                false, false, null);

        channel.basicConsume("queue.wq", new DeliverCallback() {
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println("推送来的消息:" + new String(message.getBody(), "utf-8"));
            }
        }, new CancelCallback() {
            public void handle(String consumerTag) throws IOException {
                System.out.println("Cancel: " + consumerTag);
            }
        });


    }
}

同时启动三个消费者服务,一个生产者服务。

在idea里面同时启动三个服务需做如下配置:

启动成功后,我们进入rabbitmq的管理界面,可以看到我们创建的队列及交换器信息:

执行结果

控制台执行效果如下:

可以看到,rabbitmq会将消息通过近似轮询的方式分发给不同的消费者,消费者会独立消费各自获取到的消息。

发布订阅模式 简介

在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
生产者将消息发送给交换器。交换器的作用是,从生产者接收消息,将消息推送给消息队列。交换器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。

发布订阅模式使用的是fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。

发布订阅模式是将消息广播给所有订阅该消息的消费者。如图所示:

代码实战

生产者核心代码:

public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
       //省略了设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 声明fanout类型的交换器
        channel.exchangeDeclare("ex.myfan", "fanout", true, false, null);

        for (int i = 0; i < 20; i++) {
            channel.basicPublish("ex.myfan",
                    "",  // fanout类型的交换器不需要指定路由键
                    null,
                    ("hello world fan:" + i).getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }

消费者核心代码:

public class OneConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
       //省略了设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

//        声明临时队列,队列的名字由RabbitMQ自动生成
        final String queueName = channel.queueDeclare().getQueue();
        System.out.println("生成的临时队列的名字为:" + queueName);

        channel.exchangeDeclare("ex.myfan",
                BuiltinExchangeType.FANOUT,
                true,
                false,
                null);

        // fanout类型的交换器绑定不需要routingkey
        channel.queueBind(queueName, "ex.myfan", "");

        channel.basicConsume(queueName, (consumerTag, message) -> {
            System.out.println("One   " + new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}

我们创建了一个名叫“ex.myfan”的fanout类型交换器,然后在消费者中将交换器与临时队列进行绑定。

在服务器上执行rabbitmqctl list_bindings --formatter pretty_table命令查看当前rabbitmq中的交换器与消息队列的绑定关系如下:

执行结果

此处我们复制三份消费者代码,模拟三个消费者服务进行测试,执行结果如下:

当我们将消费者客户端全部关闭之后,再在服务器上执行查看交换器和队列绑定关系信息的话,如下:

可以看到,队列都不在了,因为是临时的。

查看当前rabbitmq中交换器的状态:

交换器还在,因为是交换器是持久的。

注意:fanout类型的交换器的一个特点是类似于广播,当生产者发送消息的时候,如果消费者已经在线,那它能够收到消息,如果消费者当时不在线的话, 那么它在启动之后是收不到之前生产者发送的消息的。

路由模式 简介

使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey ,消费者定义队列并将队列、 routingKey 、Exchange绑定。此时使用 direct 模式Exchagne必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。

现在我们想让接收者只接收部分消息,如,我们通过直接模式的交换器将关键的错误信息记录到log文件,同时在控制台正常打印所有的日志信息。

这就是我们要说的路由模式。

如图所示,我们在消费者一端定义各自所需要接受的消息队列的类型,分别和交换器进行绑定,实现不同消费者消费不同类型的消息。

代码实战

生产者核心代码:

public class Producer {

    private final static String[] LOG_LEVEL = {
            "ERROR",
            "FATAL",
            "WARN"
    };

    private static Random random = new Random();

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
       //此处省略设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 声明direct类型的交换器,交换器和消息队列的绑定不需要在这里处理
        channel.exchangeDeclare("ex.routing", "direct", false, false, null);

        for (int i = 0; i < 100; i++) {
            String level = LOG_LEVEL[random.nextInt(100) % LOG_LEVEL.length];
            channel.basicPublish("ex.routing", level, null, ("这是【" + level + "】的消息").getBytes());
        }

    }


}

消费者核心代码(此处只展示ErrorConsumer,FatalConsumer和WarnConsumer类似):

public class ErrorConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
       //此处省略设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare("ex.routing", "direct", false, false, null);
        // 此处也可以声明为临时消息队列
        channel.queueDeclare("queue.error", false, false, false, null);

        channel.queueBind("queue.error", "ex.routing", "ERROR");

        channel.basicConsume("queue.error", ((consumerTag, message) -> {
            System.out.println("ErrorConsumer收到的消息:" + new String(message.getBody(), "utf-8"));
        }), consumerTag -> { });

    }
}

查看交换器与消息队列的绑定关系:

队列信息:

执行结果

执行结果如下:

主题模式 简介

topic,主题模式。使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。

要想 topic 类型的交换器, routingKey 就不能随便写了,它必须得是点分单词。单词可以随便
写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。该
点分单词字符串最长255字节。
bindingKey 也必须是这种形式。 topic 类型的交换器背后原理跟 direct 类型的类似:只要队列
的 bindingKey 的值与消息的 routingKey 匹配,队列就可以收到该消息。有两个不同:

* 匹配一个单词

#匹配0到多个单词

示例:

如下图所示,我们发送描述动物的消息。消息发送的时候指定的 routingKey 包含了三个词,两个点。
第一个单词表示动物的速度,第二个是颜色,第三个是物种:..。 :

创建三个绑定:

Q1绑定到" *.orange.* "

Q2绑定到" *.*.rabbit “和” lazy.# "。

描述:

    Q1关注orange颜色动物的消息Q2关注兔子的消息,以及所有懒的动物消息如果不能匹配,就丢弃消息。如果发送的消息 routingKey 是" lazy.orange.male.rabbit ",则会匹配最后一个绑定。
代码实战

生产者核心代码如下所示:

public class Producer {

    private static final String[] LOG_LEVEL = {"info", "error", "warn"};
    private static final String[] LOG_AREA = {"beijing", "shanghai", "shenzhen"};
    private static final String[] LOG_BIZ = {"edu-online", "biz-online", "emp-online"};

    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {

        final ConnectionFactory factory = new ConnectionFactory();
      //省略uri设置代码  factory.setUri("amqp://root:123456@...:5672/%2f");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.exchangeDeclare("ex.topic", "topic", true, false, null);

        String area, level, biz;

        String routingKey, message;
        for (int i = 0; i < 100; i++) {

            area = LOG_AREA[RANDOM.nextInt(LOG_AREA.length)];
            level = LOG_LEVEL[RANDOM.nextInt(LOG_LEVEL.length)];
            biz = LOG_BIZ[RANDOM.nextInt(LOG_BIZ.length)];

            // routingKey中包含了三个维度
            routingKey = area + "." + biz + "." + level;
            message = "LOG: [" + level + "] :这是 [" + area + "] 地区 [" + biz + "] 服务器发来的消息,MSG_SEQ = " + i;

            channel.basicPublish("ex.topic", routingKey, null, message.getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}

消费者代码(部分)如下:

public class BeijingConsumer {
    public static void main(String[] args) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
       //省略uri设置代码 factory.setUri("amqp://root:123456@...:5672/%2f");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 临时队列,返回值是服务器为该队列生成的名称
        final String queue = channel.queueDeclare().getQueue();
        channel.exchangeDeclare("ex.topic", "topic", true, false, null);
//       beijing.biz-online.error
//        只要routingKey是以beijing开头的,后面不管几个点分单词,都可以接收
        channel.queueBind(queue, "ex.topic", "beijing.#");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println(new String(message.getBody(), "utf-8"));
        }, consumerTag -> {});

    }
}
执行结果

源码

源码地址:码云demo地址

总结

我们主要学习了RabbitMQ的四种工作模式,分别是工作队列模式、发布订阅模式、路由模式、主题模式,他们各自有各自的特点及使用场景。

更多

更多我亲身经历的面试真题,还有想要内推大厂的小伙伴可以联系我,请关注微信公众号:【程序员资料站】,回复关键字 “面试” 获取更多面试资料,回复“内推”,我帮你内推大厂。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711285.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号