生产者:直连模式
package com.huixiang.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Productor {
public static void main(String[] args) {
//所有中间件都是基于tcp/ip协议
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置ip,port,账号,密码,虚拟访问节点
connectionFactory.setHost("");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
// 虚拟主机名
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
//获取创建连接获取通道
try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
//创建交换机,队列,绑定关系,路由key,发送消息接受消息
//声明队列
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,false,null);
//准备消息内容
String info = "nihao !";
//发送消息到队列
//交换机,队列名或路由key,消息状态控制(是否持久化),信息
//交换机默认,不可能存在没有交换机的对列
channel.basicPublish("",queueName,null,info.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
//关闭通道
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
//关闭连接
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
消费者:直连
package com.huixiang.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) {
//所有中间件都是基于tcp/ip协议
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置ip,port,账号,密码,虚拟访问节点
connectionFactory.setHost("");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
//获取创建连接获取通道
try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
channel.queueDeclare("queue1",false,false,false,null);
//接受消息
channel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("失败");
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
// 关闭通道(不建议关闭)
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
//关闭连接
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
代码冗余,提取工具类
package com.huixiang.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqUtils {
// 工厂是重量级资源 最好初始化就加载 加载一次
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
//设置ip,port,账号,密码,虚拟访问节点
connectionFactory.setHost("");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
// 虚拟主机名
connectionFactory.setVirtualHost("/");
}
//定义提供连接对象的方法
public static Connection setConnection(){
try {
//返回连接
return connectionFactory.newConnection();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
//关闭连接
public static void closeConnectionChanel(Channel channel,Connection connection) {
try {
if (channel != null) channel.close();
if (connection != null) connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
// 生产者简化 消费者同理
// 化简
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
//获取创建连接获取通道
try {
connection = RabbitmqUtils.setConnection();
// 创建通道
channel = connection.createChannel();
//创建交换机,队列,绑定关系,路由key,发送消息接受消息
//声明队列
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,false,null);
//准备消息内容
String info = "nihao !";
//发送消息到队列
//交换机,队列名或路由key,消息状态控制(是否持久化),信息
//交换机默认,不可能存在没有交换机的对列
channel.basicPublish("",queueName,null,info.getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
RabbitmqUtils.closeConnectionChanel(channel,connection);
}
}