一、Publish/Subscribe-消息的发布与订阅模式队列二、生产者发送消息,消息绑定交换机,交换机绑定不同的队列,消费者从队列那消息
1.生产者2.消费者: 三、测试:
开启消费者01 和02 :多了一个广播交换机:交换机绑定了两个队列:发送队列发送一条消息,两个队列都有收到:
一、Publish/Subscribe-消息的发布与订阅模式队列总结工作模式:
从结果可以看出1号消费者消费消息数量明显高于2号,即消息通过fair 机制被公平分发到每个消费者。
问题:生产者产生的消息只可以被一个消费者消费,可不可以被多个消费者消费呢?
解决:采用发布与订阅模式。
总结:工作队列-公平轮询分发-根据不同消费者机器硬件配置,消息处理速度不同,收到的消息数量也不同,通常速度快的处理的消息数量比较多,最大化使用计算机资源。适用于生成环境。
对于微信公众号,相信每个人都订阅过,当公众号发送新的消息后,对于订阅过该公众号的所有用户均可以收到消息,这个场景大家都能明白,同样对于RabbitMQ消息的处理也支持这种消息处理,当生产者把消息投送出去后,不同的消费者均可以对该消息进行消费,而不是消息被一个消费者消费后就立即从队列中删除,对于这种消息处理,我们通常称之为消息的发布与订阅模式,凡是消费者订阅了该消息,均能够收到对应消息进行处理,比较常见的如用户注册操作。模型图如下:
从图中看到:
消息产生后不是直接投送到队列中,而是将消息先投送给Exchange交换机,然后消息经过Exchange 交换机生成排他队列,投递到相关队列多个消费者消费的不再是同一个队列,而是每个消费者消费属于自己的队列。
二、生产者发送消息,消息绑定交换机,交换机绑定不同的队列,消费者从队列那消息 1.生产者排他队列:
1.基于链接可见
2.当我们关闭链接的时候,会自动删除队列
package com.xxxx.exchanges.send;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
// 定义交换机名称
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
Connection connection = null;
Channel channel = null;
try {
// 通过工厂创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//创建消息
String message = "Hello World!";
// 将产生的消息放入队列
// 发送消息(交换机、队列名称、额外发送消息、消息实体)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
// 关闭通道
if(null != channel && channel.isOpen()){
channel.close();
}
// 关闭连接
if (null != connection && connection.isOpen()){
connection.close();
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.消费者:
package com.xxxx.exchanges.recv;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv01 {
// 队列名称
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
try {
//1. 通过工厂创建连接
Connection connection = factory.newConnection();
//2. 获取通道,创建信道
Channel channel = connection.createChannel();
//3. 绑定交换机:
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
//获取队列 (排他队列)
String queueName = channel.queueDeclare().getQueue();
//将队列和交换机进行绑定
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");
// 获取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列消费(队列名称、自动回值(当我的消费者收到消息后,告诉队列我收到消息了))
//false:手动去确认消息给队列
channel.basicConsume(queueName, true, deliverCallback, consumerTag
-> {
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
三、测试:
开启消费者01 和02 :多了一个广播交换机:



