栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ学习==》路由绑定

RabbitMQ学习==》路由绑定

文章目录

工作路由

1、绑定2、直接交换

单个绑定:多个绑定: 3、示例

发送消息:接收消息:

工作路由 1、绑定

​ 绑定是交换和队列之间的关系,绑定可以采用额外的routingKey参数

channel.queueBind(queueName, EXCHANGE_NAME, "black" );

绑定键的含义取决于交换类型

2、直接交换 单个绑定:

​ 直接交换背后的路由算法很简单——消息进入 绑定键与消息的 路由键完全匹配的队列

在这个设置中,我们可以看到绑定了两个队列的直接交换X。第一个队列使用绑定键orange进行绑定,第二个队列有两个绑定,一个使用绑定键black,另一个使用green。

在这样的设置中,使用路由键orange发布到交换的消息 将被路由到队列Q1。带有black 或 green路由键的消息将发送到Q2。所有其他消息将被丢弃

多个绑定:

使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键black在X和Q1之间添加绑定。在这种情况下,直接交换的行为类似于扇出并将消息广播到所有匹配的队列。带有路由键black的消息将被传送到 Q1和Q2

3、示例 发送消息:
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        argv = new String[]{"info"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.6.128");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            String severity = getSeverity(argv);
            String message = getMessage(argv);
            // 发布消息,第一个参数为交换器名,第二个参数为路由键名
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
    }
    private static String getSeverity(String[] strings) {
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length <= startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
接收消息:
	private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        argv = new String[]{"info", "warning", "error"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.6.128");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for (String severity : argv) {
            // 将队列与交换器绑定,指定队列名、交换器、路由键名
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780937.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号