栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

032-云E办

032-云E办

032-云E办_RabbitMQ_Publish/Subscribe-消息的发布与订阅模式队列

一、Publish/Subscribe-消息的发布与订阅模式队列二、生产者发送消息,消息绑定交换机,交换机绑定不同的队列,消费者从队列那消息

1.生产者2.消费者: 三、测试:

开启消费者01 和02 :多了一个广播交换机:交换机绑定了两个队列:发送队列发送一条消息,两个队列都有收到:

总结工作模式:
从结果可以看出1号消费者消费消息数量明显高于2号,即消息通过fair 机制被公平分发到每个消费者。
问题:生产者产生的消息只可以被一个消费者消费,可不可以被多个消费者消费呢?
解决:采用发布与订阅模式。
总结:工作队列-公平轮询分发-根据不同消费者机器硬件配置,消息处理速度不同,收到的消息数量也不同,通常速度快的处理的消息数量比较多,最大化使用计算机资源。适用于生成环境。

一、Publish/Subscribe-消息的发布与订阅模式队列

对于微信公众号,相信每个人都订阅过,当公众号发送新的消息后,对于订阅过该公众号的所有用户均可以收到消息,这个场景大家都能明白,同样对于RabbitMQ消息的处理也支持这种消息处理,当生产者把消息投送出去后,不同的消费者均可以对该消息进行消费,而不是消息被一个消费者消费后就立即从队列中删除,对于这种消息处理,我们通常称之为消息的发布与订阅模式,凡是消费者订阅了该消息,均能够收到对应消息进行处理,比较常见的如用户注册操作。模型图如下:

从图中看到:
消息产生后不是直接投送到队列中,而是将消息先投送给Exchange交换机,然后消息经过Exchange 交换机生成排他队列,投递到相关队列多个消费者消费的不再是同一个队列,而是每个消费者消费属于自己的队列。

排他队列:
1.基于链接可见
2.当我们关闭链接的时候,会自动删除队列

二、生产者发送消息,消息绑定交换机,交换机绑定不同的队列,消费者从队列那消息 1.生产者
package com.xxxx.exchanges.send;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Send {
   // 定义交换机名称
   private final static String EXCHANGE_NAME = "exchange_fanout";
   public static void main(String[] args) {
       // 创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("192.168.75.100");
       factory.setPort(5672);
       factory.setUsername("yeb");
       factory.setPassword("yeb");
       factory.setVirtualHost("/yeb");

       Connection connection = null;
       Channel channel = null;
       try {
           // 通过工厂创建连接
           connection = factory.newConnection();
           // 获取通道
           channel = connection.createChannel();
           // 绑定交换机
           channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
           //创建消息
           String message = "Hello World!";

           // 将产生的消息放入队列
           // 发送消息(交换机、队列名称、额外发送消息、消息实体)
           channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
           System.out.println(" [x] Sent '" + message + "'");

       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       } finally {
           try {
               // 关闭通道
               if(null != channel && channel.isOpen()){
                   channel.close();
               }
               // 关闭连接
               if (null != connection && connection.isOpen()){
                   connection.close();
               }
           } catch (TimeoutException e) {
               e.printStackTrace();
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
}

2.消费者:
package com.xxxx.exchanges.recv;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Recv01 {
   // 队列名称
   private final static String EXCHANGE_NAME = "exchange_fanout";
   public static void main(String[] args) {
       // 创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("192.168.75.100");
       factory.setPort(5672);
       factory.setUsername("yeb");
       factory.setPassword("yeb");
       factory.setVirtualHost("/yeb");
       try {
           //1. 通过工厂创建连接
           Connection connection = factory.newConnection();
           //2. 获取通道,创建信道
           Channel channel = connection.createChannel();

           //3. 绑定交换机:
           channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
           //获取队列 (排他队列)
           String queueName = channel.queueDeclare().getQueue();
           //将队列和交换机进行绑定
           channel.queueBind(queueName,EXCHANGE_NAME,"");


           System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");

           // 获取消息
           DeliverCallback deliverCallback = (consumerTag, delivery) -> {
               String message = new String(delivery.getBody(), "UTF-8");
               System.out.println(" [x] Received '" + message + "'");
           };

           // 监听队列消费(队列名称、自动回值(当我的消费者收到消息后,告诉队列我收到消息了))
           //false:手动去确认消息给队列
           channel.basicConsume(queueName, true, deliverCallback, consumerTag
                   -> {
           });

       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       }
   }
}
三、测试: 开启消费者01 和02 :多了一个广播交换机:


交换机绑定了两个队列:

发送队列发送一条消息,两个队列都有收到:


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758896.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号