RabbitMQ发布订阅模式
相关视频参考(来自动力节点):https://www.bilibili.com/video/BV1Ap4y1D7tU
相关资料下载:http://www.bjpowernode.com/?csdn
在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。
生产者
生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。
package rabbitmq.publishsubscribe;
import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1 {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//定义名字为logs的交换机,交换机类型为fanout
//这一步是必须的,因为禁止发布到不存在的交换。
ch.exchangeDeclare("logs", "fanout");
while (true) {
System.out.print("输入消息: ");
String msg = new Scanner(System.in).nextLine();
if ("exit".equals(msg)) {
break;
}
//第一个参数,向指定的交换机发送消息
//第二个参数,不指定队列,由消费者向交换机绑定队列
//如果还没有队列绑定到交换器,消息就会丢失,
//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
System.out.println("消息已发送: "+msg);
}
c.close();
}
}
消费者
如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。
ReceiveLogs.java代码:
package rabbitmq.publishsubscribe;
import java.io.IOException;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Test2 {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//定义名字为 logs 的交换机, 它的类型是 fanout
ch.exchangeDeclare("logs", "fanout");
//自动生成对列名,
//非持久,独占,自动删除
String queueName = ch.queueDeclare().getQueue();
//把该队列,绑定到 logs 交换机
//对于 fanout 类型的交换机, routingKey会被忽略,不允许null值
ch.queueBind(queueName, "logs", "");
System.out.println("等待接收数据");
//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);
}
};
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume(queueName, true, callback, cancel);
}
}



