package com.sky.rabbitmq.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
String message = "order来了";
String exchange_name = "direct_message_exchange";
String routeKey = "order";
String exchange_type = "direct";
// 声明队列
channel.queueDeclare("queue7",true,false,false,null);
channel.queueDeclare("queue8",true,false,false,null);
channel.queueDeclare("queue9",true,false,false,null);
// 声明交换机
channel.exchangeDeclare(exchange_name,exchange_type,true);
// 绑定队列
channel.queueBind("queue7",exchange_name,"order");
channel.queueBind("queue8",exchange_name,"order");
channel.queueBind("queue9",exchange_name,"goods");
// channel.exchangeBind();
channel.basicPublish(exchange_name,routeKey,null,message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息发送失败!");
}finally {
// 7:关闭连接
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8:关闭通道
if (connection!= null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者
- 声明队列
- 声明交换机
- 交换机绑定队列
- 发送消息
- 关闭
代码的方式和网页端操作差不多,理解即可。
package com.sky.rabbitmq.all;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
private static Runnable runnable = new Runnable() {
public void run() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.basicConsume( queueName , true , new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" +
new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("异常");
}
});
System.out.println("接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(runnable,"queue7").start();
new Thread(runnable,"queue8").start();
new Thread(runnable,"queue9").start();
// new Thread(runnable,"queue2").start();
// new Thread(runnable,"queue3").start();
}
}
接收消息的时候一定要记得开启autoAck,不然一直无法回应服务器的消息,也就是无法删除消息,如果这一栏有消息变化,那么就是没开启autoAck。



