栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ——交换机

RabbitMQ——交换机

1. 交换机 exchange

不指定交换机直接发送到队列时,多个消费者之间存在的是竞争关系,一个消息只能被一个消费者接收,其他的消费者不能够再次接收;交换机可以绑定多个不同的队列,但是其Routingkkey是相同的,这样就可以从多个不同的队列发送相同的消息给多个消费者。

不指定交换机的情况,称为简单(工作)模式:

使用交换机的情况,称为发布、订阅模式:

在RabbitMQ中生产者不会讲消息直接发送到队列(不指定有默认交换机);交换机接收来自生产者的消息并推送给队列。

交换机类型:

    direct直接类型;topic主题类型;headers标题类型;不常用fanout扇出类型。

无名类型:使用空字符串来指定。

1.1 临时队列

临时队列是没有持久化的队列。也可以直接随机给队列起一个名字,当消费者断开连接时,队列也会自动删除。

创建随机临时队列:

String queueName = channel.queueDeclare().getQueue();
1.2 绑定 bindings

绑定就是交换机和队列之间的绑定关系。

1.3 Fanout 扇出——发布订阅模式

它将接收到的所有消息广播到它知道的所有队列中。

模拟测试场景:

    指定一个信道的交换机模式为Fanout;交换机绑定两个不同的队列;发送给两个接收不同队列消息的消费者;都接受到代表测试成功。
1.3.1 生产者
public class Producer {
    private static final String EXCHANGE_NAME = "fanout_mq";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String msg = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:" + msg);
        }
    }
}
1.3.2 消费者
public class Worker1 {

    private static final String EXCHANGE_NAME = "fanout_mq";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        // 声明临时队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("线程A等待接收消息...");

        DeliverCallback callback = (consumerTag, message) -> {
            System.out.println("线程A接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };

        channel.basicConsume(queueName,true,callback,consumerTag -> {});

    }

}

两个消费者除了线程名字不同外没有其他区别。

1.3.3 测试结果

生产者发送:

两个消费者分别接收到:

1.4 direct 路由模式

指定RoutingKey使得不同的key接收队列的消息。

测试:

    创建两个消费者;两个消费者绑定相同的交换机和相同的队列;但是分别指定不同的RoutingKey;A 绑定 “info” ;B 绑定 “error”;使用生产者发送不同的RoutingKey,如果A和B只按照相应RoutingKey来接收则实验成功。

生产者:

public class Producer {
    private static final String EXCHANGE_NAME = "direct_mq";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        Scanner scanner = new Scanner(System.in);

        int i = 1;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next();
            if ( i % 2 == 0){
                channel.basicPublish(EXCHANGE_NAME,"info",null,msg.getBytes(StandardCharsets.UTF_8));
            }else {
                channel.basicPublish(EXCHANGE_NAME,"error",null,msg.getBytes(StandardCharsets.UTF_8));
            }
            System.out.println("发送消息:" + msg);
        }
    }
}

消费者A:

public class Worker1 {

    private static final String EXCHANGE_NAME = "direct_mq";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 声明一个队列
        channel.queueDeclare("console",false,false,false,null);
        // 绑定交换机和队列
        channel.queueBind("console",EXCHANGE_NAME,"info");
        System.out.println("线程A等待接收消息...");

        DeliverCallback callback = (consumerTag, message) -> {
            System.out.println("线程A接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };

        channel.basicConsume("console",true,callback,consumerTag -> {});

    }

}

消费者B:
修改名称和指定RoutingKey为error。

测试结果:


1.3 topic 主题模式

上方路由模式中无法同时发送多个队列,在当前topic模式中可以指定多个RoutingKey中间使用英文句号隔开。通配符*代替一个单词;#代表零个或多个单词。

生产者:

public class Producer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        
        Map bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
        for (Map.Entry bindingKeyEntry : bindingKeyMap.entrySet()) {
            String bindingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();

            channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }
}

消费者1:

public class Worker1 {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明 Q1 队列与绑定关系
        String queueName = "Q1";
        //声明
        channel.queueDeclare(queueName, false, false, false, null);
        //绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接收消息........... ");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }

}

消费者2

public class Worker2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明 Q2 队列与绑定关系
        String queueName = "Q2";
        //声明
        channel.queueDeclare(queueName, false, false, false, null);
        //绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

        System.out.println("等待接收消息........... ");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761675.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号