1.发布订阅模式下,消息会群发给所有的消费者,同一条消息所有的消费者都可以接收到
2.交换机:fanout交换机
3.生产者:定义交换机,向交换机发送消息
4.消费者:
(1)定义交换机
(2)定义随机队列
(3)与交换机绑定
(4)接收消息
5.发布订阅模式实战
package org.example.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class Producer {
// 交换机名称
public static final String EXCHANGE_NAME="fanout_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("112.124.16.34");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
Scanner s=new Scanner(System.in);
while(s.hasNext()){
String message=s.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
}
System.out.println("发送完毕");
// 关闭通道和连接
channel.close();
// ***关闭连接***
connection.close();
}
}
package org.example.fanout;
import com.rabbitmq.client.*;
public class ReceiveLogs01 {
// 交换机的名称
public static final String EXCHANGE_NAME="fanout_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("112.124.16.34");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection= factory.newConnection();
Channel channel=connection.createChannel();
// 声明一个fanout交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 声明一个队列 非持久化,独占,自动删除
// 在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有
// 生成名称(随机)、非持久化的、独占的、自动删除的队列
String queueName=channel.queueDeclare().getQueue();
// 绑定
channel.queueBind(queueName,EXCHANGE_NAME,"");
// 接收和处理消息的回调对象
DeliverCallback deliverCallback=(consumerTag, message)->{
String mes=new String(message.getBody(),"UTF-8");
System.out.println("logs01接收到消息:"+mes);
};
// 消费者取消时的回调对象
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
// channel.close();
// ***关闭连接***
// connection.close();
}
}
package org.example.fanout;
import com.rabbitmq.client.*;
public class ReceiveLogs02 {
// 交换机的名称
public static final String EXCHANGE_NAME="fanout_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("112.124.16.34");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection= factory.newConnection();
Channel channel=connection.createChannel();
// 声明一个fanout交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 声明一个队列 非持久化,独占,自动删除
String queueName=channel.queueDeclare().getQueue();
// 绑定
channel.queueBind(queueName,EXCHANGE_NAME,"");
// 接收和处理消息的回调对象
DeliverCallback deliverCallback=(consumerTag, message)->{
String mes=new String(message.getBody(),"UTF-8");
System.out.println("logs02接收到消息:"+mes);
};
// 消费者取消时的回调对象
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
// channel.close();
// ***关闭连接***
// connection.close();
}
}
发送第二条消息的时候启动消费者2 ,只接收当前的最新消息



