1.发布订阅模式下,我们构建了一个简单的日志记录系统,我们能够向许多接收者广播日志消息。而在路由模式下,我们将向其中添加一些特别的功能,比如说我们只让某个消费者订阅发布的部分消息,例如我们只把严重错误的消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息
2.Fanout 这种交换机类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用 direct 这种交换机类型来进行替换,这种交换机类型的工作方式是消息只去到它绑定的 routingKey 队列中去
在这种绑定情况下,生产者发布消息到 交换机上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black、green的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃
3.实战演示
package org.example.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
// 交换机名称
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("112.124.16.34");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
String message="正常日志消息!";
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
String message1="警告消息!";
channel.basicPublish(EXCHANGE_NAME,"warning",null,message1.getBytes("UTF-8"));
String message2="错误消息!";
channel.basicPublish(EXCHANGE_NAME,"error",null,message2.getBytes("UTF-8"));
System.out.println("发送完毕");
// 关闭通道和连接
channel.close();
// ***关闭连接***
connection.close();
}
}
package org.example.direct;
import com.rabbitmq.client.*;
public class ReceiveLogs01 {
// 交换机的名称
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("112.124.16.34");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection= factory.newConnection();
Channel channel=connection.createChannel();
// 声明一个direct交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
// 声明一个队列
channel.queueDeclare("console",false,false,true,null);
// 绑定
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
// 接收和处理消息的回调对象
DeliverCallback deliverCallback=(consumerTag, message)->{
String mes=new String(message.getBody(),"UTF-8");
System.out.println("logs01接收到消息:"+mes);
};
// 消费者取消时的回调对象
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费被中断");
};
channel.basicConsume("console",true,deliverCallback,cancelCallback);
// channel.close();
// ***关闭连接***
// connection.close();
}
}
package org.example.direct;
import com.rabbitmq.client.*;
public class ReceiveLogs02 {
// 交换机的名称
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("112.124.16.34");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection= factory.newConnection();
Channel channel=connection.createChannel();
// 声明一个direct交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
// 声明一个队列
channel.queueDeclare("disk",false,false,true,null);
// 绑定
channel.queueBind("disk",EXCHANGE_NAME,"error");
// 接收和处理消息的回调对象
DeliverCallback deliverCallback=(consumerTag, message)->{
String mes=new String(message.getBody(),"UTF-8");
System.out.println("logs02接收到消息:"+mes);
};
// 消费者取消时的回调对象
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费被中断");
};
channel.basicConsume("disk",true,deliverCallback,cancelCallback);
// channel.close();
// ***关闭连接***
// connection.close();
}
}



