导入maven依赖
首先在Win com.rabbitmq amqp-client 5.12.0
dows 或者 Linux开启rabbitMQ-Server服务,再进行java代码集成。
1. 简单模式-
编辑Util类
private static final String HOST = "192.168.43.17"; // 设置IP地址 private static final String VIRTUALHOST = "/"; // 虚拟主机 private static final String USERNAME = "guest"; // 用户名 private static final String PASSWORD = "guest"; // 密码 private static final Integer PORT = 5672; // rabbitmq-server 端口 public Connection getConnection () { Connection connection = null; // 获取链接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setVirtualHost(VIRTUALHOST); connectionFactory.setUsername(USERNAME); connectionFactory.setPassword(PASSWORD); // rabbitmq 的服务器地址 15672:给rabbitmq management web程序,插件 web端客户端管理工具 //5672:给rabbitmq-server 服务器的 connectionFactory.setPort(PORT); // 建立链接 try { Connection newConnection = connectionFactory.newConnection(); connection = newConnection; } catch (IOException e) { System.out.println("连接失败!!!"); e.printStackTrace(); } catch (TimeoutException e) { System.out.println("连接超时!!!"); e.printStackTrace(); } return connection; } -
编写消息队列发布者
public static void main(String[] args) throws IOException, TimeoutException { Connection getConnection = new ConnectionUtil().getConnection(); // 创建频道 Channel channel = getConnection.createChannel(); // 声明队列 (队列名称 是否可持久化 是否独占 是否自动删除 配置其他参数) channel.queueDeclare("zjw_test", true, false, false, null); // 消息发布 (交换机名 路由名 其他属性 消息的字节数组) channel.basicPublish("","zjw_test",null,"Hello Word!!!".getBytes()); // 关闭连接 getConnection.close(); } -
编写消息队列消费者
public static void main(String[] args) throws IOException { // 获取连接 Connection getConnection = new ConnectionUtil().getConnection(); // 创建频道 Channel channel = getConnection.createChannel(); // 声明队列 channel.queueDeclare("zjw_test",true,false,false,null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume("zjw_test", true, deliverCallback, consumerTag -> {}); }
消费者不做关闭连接的操作 同时运行在后台 再启动发布者
总结:工作模式就是多人同时消费一个队列(并且队列会平均分配给消费者)
多了声明交换机和绑定交换机的操作
- 发布者
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = new ConnectionUtil().getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列 (队列名称 是否持久化 是否独占队列 是否自动删除 其他属性)
channel.queueDeclare("fanout_message1",true,false,false,null);
channel.queueDeclare("fanout_message2",true,false,false,null);
// 声明交换机 (交换机名 类型)
channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
// 交换机绑定队列 (队列名 交换机名 路由)
channel.queueBind("fanout_message1","fanout_exchange","");
channel.queueBind("fanout_message2","fanout_exchange","");
// 发布消息
String message = "消息队列:发布/订阅模式";
// channel.basicPublish("fanout_exchange","",null,message.getBytes());
for (int i = 0; i < 10; i++) {
channel.basicPublish("fanout_exchange","",null,message.getBytes());
}
// 关闭资源
channel.close();
connection.close();
}
- 消费者
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = new ConnectionUtil().getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("fanout_message1",true,false,false,null);
// 声明函数
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
// 如果之前没有指定,则会自动生成一个消费者标签,对消费者进行唯一标识
System.out.println("consumerTag:"+consumerTag);
// 可以通过envelope获取exchange和routingkey信息
System.out.println("exchange:"+envelope.getExchange()+";routingkey:"+envelope.getRoutingKey()
+";deliveryTag:"+envelope.getDeliveryTag());
// 打印消息
System.out.println("fanout_message1:" + new String(body,"UTF-8"));
}
};
// 消费队列 (队列名 是否自动化 函数名)
channel.basicConsume("fanout_message1",true,defaultConsumer);
}
总结:发布/订阅模式指的是将消息平均发布到交换机里(消费者可根据队列名消费分配到的队列消息)
4. 路由模式填写了routingKey路由名称
- 消息队列发布者
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = new ConnectionUtil().getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("routing_message1",true,false,false,null);
channel.queueDeclare("routing_message2",true,false,false,null);
// 声明交换机 (交换机名 类型)
channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
// 绑定交换机 (队列名 交换机名 路由)
channel.queueBind("routing_message1","routing_exchange","insert");
channel.queueBind("routing_message2","routing_exchange","update");
channel.queueBind("routing_message1","routing_exchange","delete");
channel.queueBind("routing_message2","routing_exchange","select");
// 发布消息
channel.basicPublish("routing_exchange","insert",null,"路由模式insert".getBytes());
channel.basicPublish("routing_exchange","update",null,"路由模式update".getBytes());
channel.basicPublish("routing_exchange","delete",null,"路由模式delete".getBytes());
channel.basicPublish("routing_exchange","select",null,"路由模式select".getBytes());
// 关闭资源
channel.close();
connection.close();
}
- 消息队列消费者
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = new ConnectionUtil().getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("routing_message1",true,false,false,null);
// defautl函数
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
System.out.println("消费1:"+new String(body,"utf-8"));
}
};
// 消费队列
channel.basicConsume("routing_message1",defaultConsumer);
}
总结:路由模式是指 将消息发布在不同的交换机并且在交换机分配不同的路由空间
5. 主题模式基于路由模式 比路由模式更实用(推荐)
- 发布者
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = new ConnectionUtil().getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topic_message1",true,false,false,null);
// 声明交换机
channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
// 绑定交换机 (队列名 交换机名 路由名(#代表下面的所有 *代表下一个目录))
channel.queueBind("topic_message1","topic_exchange","item.*");
channel.queueBind("topic_message2","topic_exchange","menu.#");
// 发布消息
channel.basicPublish("topic_exchange","item.delete",null,"商品的删除".getBytes());
channel.basicPublish("topic_exchange","item.update",null,"商品的修改".getBytes());
channel.basicPublish("topic_exchange","item.insert.del",null,"商品的新增下面的删除".getBytes());
channel.basicPublish("topic_exchange","menu.system",null,"菜单的系统设置".getBytes());
channel.basicPublish("topic_exchange","menu.system.sel",null,"菜单的系统设置下的查询".getBytes());
channel.basicPublish("topic_exchange","menu.system.info",null,"菜单的系统设置下的个人信息".getBytes());
// 关闭资源
channel.close();
connection.close();
}
- 消费者
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = new ConnectionUtil().getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topic_message1",true,false,false,null);
// 默认函数
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
System.out.println("消费1:"+new String(body,"utf-8"));
}
};
// 消费队列
channel.basicConsume("topic_message1",defaultConsumer);
}
参考项目点击获取连接



