栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

033-云E办

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

033-云E办

033-云E办_RabbitMQ_Routing-路由模式队列

一、direct路由模式简介:二、根据路由key,指定发放订阅队列:

send:消费者 三、测试:

开启消费者01 和02

发布订阅——广播模式:从结果可以看出生产者发送了一条消息,用于邮件发送和短信发送的消费者均可以收到消息进行后续处理。

问题:生产者产生的消息所有消费者都可以消费,可不可以指定某些消费者消费呢?
解决:采用direct路由模式。

一、direct路由模式简介:

通过案例03,可以看到,生产者将消息投送给交换机后,消息经交换机分发到不同的队列即:交换机收到消息,默认对于绑定到每个交换机的队列均会接收到交换机分发的消息,对于案例03的交换机的消息分发Exchange Types为 fanout 类型,通常在真正项目开发时会遇到这种情况:在对项目信息输出日志进行收集时,会把日志(error warning,info)分类进行输出,这时通过Exchange Types中的 direct 类型,就可以实现,针对不同的消息,在对消息进行消费时,通过 Exchange types 以及 Routing key 设置的规则 ,便可以将不同消息路由到不同的队列中然后交给不同消费者进行消费操作。模型图如下:

    生产者产生的消息投给交换机交换机投送消息时的Exchange Types为direct类型消息通过定义的Routing Key被路由到指定的队列进行后续消费
二、根据路由key,指定发放订阅队列:

消费者对消息进行后续消费时,对于接收消息的队列在对消息进行接收时,绑定到每一个交换机上的队列均会指定其Routing Key规则,通过路由规则将消息路由到执行队列中。

send:
package com.xxxx.direct.send;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Send {
    // 定义交换机名称
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.75.100");
        factory.setPort(5672);
        factory.setUsername("yeb");
        factory.setPassword("yeb");
        factory.setVirtualHost("/yeb");

        Connection connection = null;
        Channel channel = null;
        try {
            // 通过工厂创建连接
            connection = factory.newConnection();
            // 获取通道
            channel = connection.createChannel();
            // 绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //创建消息
            String infoMessage = "普通信息";
            String errorMessage = "错误信息";
            String warningMessage = "警告信息";
            //准备路由
            String infoRoutingKey = " info";
            String errorRoutingKey = " error";
            String warningRoutingKey = " warning";
            // 将产生的消息放入队列
            // 发送消息(交换机、队列名称、额外发送消息、消息实体)
            // 发送不同的消息,携带了不同的路由key
            channel.basicPublish(EXCHANGE_NAME, infoRoutingKey, null, infoMessage.getBytes("UTF-8"));
            channel.basicPublish(EXCHANGE_NAME, errorRoutingKey, null, errorMessage.getBytes("UTF-8"));
            channel.basicPublish(EXCHANGE_NAME, warningRoutingKey, null, warningMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + infoMessage + "'");
            System.out.println(" [x] Sent '" + errorMessage + "'");
            System.out.println(" [x] Sent '" + warningMessage + "'");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                // 关闭通道
                if(null != channel && channel.isOpen()){
                    channel.close();
                }

                // 关闭连接
                if (null != connection && connection.isOpen()){
                    connection.close();
                }
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者

消费者对消息进行后续消费时,对于接收消息的队列在对消息进行接收时,绑定到每一个交换机上的队列均会指定其Routing Key规则,通过路由规则将消息路由到执行队列中。

package com.xxxx.direct.recv;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Recv01 {
    // 队列名称
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.75.100");
        factory.setPort(5672);
        factory.setUsername("yeb");
        factory.setPassword("yeb");
        factory.setVirtualHost("/yeb");
        try {
            //1. 通过工厂创建连接
            Connection connection = factory.newConnection();
            //2. 获取通道,创建信道
            Channel channel = connection.createChannel();

            //3. 绑定交换机:
            channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
            //获取队列 (排他队列)
            String queueName = channel.queueDeclare().getQueue();

            //4. 绑定队列: 只接受队列中的错误信息。
            //将队列和交换机进行绑定

            String errorRoutingKey = " error";

            channel.queueBind(queueName,EXCHANGE_NAME,errorRoutingKey);

            System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");

            // 获取消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };

            // 监听队列消费(队列名称、自动回值(当我的消费者收到消息后,告诉队列我收到消息了))
            //false:手动去确认消息给队列
            channel.basicConsume(queueName, true, deliverCallback, consumerTag
                    -> {
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

三、测试: 开启消费者01 和02

测试中:
01只能消费“错误消息”
02能消费3个消息:info /warning / error



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/755747.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号