1、生产者代码:
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、获取连接
ConnUtils cs = new ConnUtils();
Connection connection = cs.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、创建交换机
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 4、创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 5、绑定队列和交换机
// 队列一绑定 t1
channel.queueBind(queue1Name,exchangeName,"t1");
// 队列二绑定 t2和t3
channel.queueBind(queue2Name,exchangeName,"t2");
channel.queueBind(queue2Name,exchangeName,"t3");
// 6、发送消息
String body = "test_direct";
channel.basicPublish(exchangeName,"t1",null,body.getBytes());
// 7、释放资源
channel.close();
connection.close();
}
}
2、消费者01:
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接
ConnUtils cs = new ConnUtils();
Connection connection = cs.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、创建队列
String queue1Name = "test_direct_queue1";
// 4、接受消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
// 5、监听消息
channel.basicConsume(queue1Name,true,consumer);
}
}
3、消费者02:
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接
ConnUtils cs = new ConnUtils();
Connection connection = cs.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、创建队列
String queue2Name = "test_direct_queue2";
// 4、接受消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
// 5、监听消息
channel.basicConsume(queue2Name,true,consumer);
}
}
2、测试
1、启动生产者 2、是否只有customer01接收到了消息
消费者01控制台: body:test_direct
项目代码链接:https://github.com/Mbm7280/rabbitmq_demo



