一、创建管道链接
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.9");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
return factory.newConnection();
}
}
二、创建生产者,消费者
private static final String QUEUE_NAME = "hello,rabbitmq";
public void customer() throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try {
System.out.println("。。。。。。。。。。");
//获取链接
connection = ConnectionUtil.getConnection();
//创建通道
channel = connection.createChannel();
channel.basicQos(20);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String body = new String(message.getBody());
System.out.println("customer: 开始消费 " + body);
}
};
channel.basicConsume(QUEUE_NAME, deliverCallback, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("-------over-------");
}catch (Exception e){
e.printStackTrace();
}finally {
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
}
}
public void product() throws IOException, TimeoutException {
//获取链接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "hello,rabbitmq!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息: " + message);
}
public static void main(String[] args) {
try {
RainApplicationTests tests = new RainApplicationTests();
tests.product();
tests.customer();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
产生问题的原因管道被关闭了,链接应该保持长链接
channel.close();
connection.close();



