栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ的多个工作模式总结

RabbitMQ的多个工作模式总结

RabbitMQ的多个工作模式总结

对照官网总结
https://www.rabbitmq.com/getstarted.html

  1. 简单模式

一个生产、一个消费,不用指定交换机,使用默认交换机

  1. 工作队列模式

一个生产,多个消费,可以有轮训和公平策略,不用指定交换机,使用默认交换机

  1. 发布订阅模式

fanout类型交换机,通过交换机和队列绑定,不用指定绑定的路由健,生产者发送消息到交换机,fanout交换机直接进行转发,消息不用指定routingkey路由健

  1. 路由模式

direct类型交换机,过交换机和队列绑定,指定绑定的路由健,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要指定routingkey路由健

  1. 通配符模式

topic交换机,过交换机和队列绑定,指定绑定的【通配符路由健】,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要指定routingkey路由健

RabbitMQ的发布订阅消息模型代码

生产者

public class Send {
​
    private final static String EXCHANGE_NAME = "exchange_fanout";
​
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.13");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
​
        
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {
​
            //绑定交换机,fanout扇形,即广播类型
            channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
​
            String message = "Hello World pub !";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
​
        }
    }
}

消费端(两个节点)

public class Recv1 {
​
    private final static String EXCHANGE_NAME = "exchange_fanout";
​
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.13");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
​
        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
​
        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
​
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
​
        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"");
​
​
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
​
        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
​
    }
}

验证:启动两个消费者,一个生产者发送消息

二、RabbitMQ的主题模式和应用场景

背景:
如果业务很多路由key,怎么维护??
topic交换机,支持通配符匹配模式,更加强大
工作基本都是用这个topic模式
什么是rabbitmq的主题模式
文档 https://www.rabbitmq.com/tutorials/tutorial-five-java.html

  • 交换机是 topic, 可以实现发布订阅模式fanout和路由模式Direct 的功能,更加灵活,支持模式匹配,通配符等
  • 交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词,一般用#作为通配符居多,比如 #.order, 会匹配
    info.order 、sys.error.order, 而 *.order ,只会匹配 info.order, 之间是使用.
    点进行分割多个词的; 如果是 ., 则info.order、error.order都会匹配

注意
交换机和队列绑定时用的binding使用通配符的路由健
生产者发送消息时需要使用具体的路由健
测试,下面的匹配规则是怎样的

quick.orange.rabbit 只会匹配 .orange...rabbit ,进到Q1和Q2
lazy.orange.elephant 只会匹配 .orange. 和 lazy.#,进到Q1和Q2 quick.orange.fox
只会匹配 .orange.,进入Q1 lazy.brown.fox 只会匹配azy.#,进入Q2 lazy.pink.rabbit
只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次) ​ quick.brown.fox
没有匹配,默认会被丢弃,可以通过回调监听二次处理 ​ lazy.orange.male.rabbit,只会匹配 lazy.#,进入Q2


例子:日志采集系统

  • 一个队列收集订单系统的全部日志信息,order.log.#
  • 一个队列收集全部系统的全部日志信息, #.log

生产者

public class Send {
​
    private final static String EXCHANGE_NAME = "exchange_topic";
​
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.13");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
​
        
​
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {
​
            //绑定交换机,直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
​
            String error = "我是订单错误日志";
            String info = "我是订单info日志";
            String debug = "我是商品debug日志";
            channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
​
            System.out.println("消息发送成功");
​
        }
    }
}

消费者(两个)

public class Recv1 {
​
    private final static String EXCHANGE_NAME = "exchange_topic";
​
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.13");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
​
        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
​
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
​
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
​
        //绑定队列和交换机,第一个节点
        //channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
​
      //绑定队列和交换机,第二个节点
      //channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");
​
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
​
        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
​
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457722.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号