rabbitmq入门简介 : https://blog.csdn.net/kavito/article/details/91403659
rabbitmq安装教程: https://blog.csdn.net/weixin_42673046/article/details/118442323
使用Java连接rabbitmqrabbitmq入门介绍以及安装教程网上有过很好的讲解,这就不重复造轮子了,直奔主题。记录使用java进行连接rabbitmq以及springboot集成rabbitmq中遇到的问题
导入依赖:
com.rabbitmq amqp-client 5.9.0 commons-io commons-io 2.6
编写rabbitmqconfigutils配置类,把mq的连接操作提取出来,代码解耦
public class RabbitmqUtils {
private static Channel channel = null;
private static Connection connection = null;
public static Channel getChannel(){
//定义连接池
ConnectionFactory factory = new ConnectionFactory();
//设置主机地址
factory.setHost("192.168.81.131");
//设置端口
factory.setPort(5672);
//设置用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
//虚拟机路径
factory.setVirtualHost("/");
try {
connection = factory.newConnection();
//创建信道
channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return channel;
}
//关闭连接
public static void closeConnection(){
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
发布订阅模式rabbitmq有六种工作模式分别是:简单队列、work模式、发布/订阅模式、路由模式、主题模式还有个rpc模式,
下面主要记录发布订阅工作模式、路由工作模式、topic主题工作模式
无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者。生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
//利用写好工具类获取信道连接
Channel channel = RabbitmqUtils.getChannel();
try {
channel.exchangeDeclare("text_pubsub", BuiltinExchangeType.FANOUT,false,false,false,null);
channel.queueDeclare("pubsub_queue1",false,false,false,null);
channel.queueDeclare("pubsub_queue2",false,false,false,null);
channel.queueBind("pubsub_queue1","text_pubsub","");
channel.queueBind("pubsub_queue2","text_pubsub","");
String msg = "发布订阅模式!!!";
channel.basicPublish("text_pubsub","",null,msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
RabbitmqUtils.closeConnection();
}
运行测试对比rabbitmq界面管理(代码若没报错创建成功,可通过ip地址+15672访问web界面查看)
查看队列是否成功创建
查看是否成功创建交换机
消息消费者Channel channel = RabbitmqUtils.getChannel();
try {
channel.basicConsume("pubsub_queue1",true,(s, delivery) ->{
System.out.println( new String(delivery.getBody())+"存到数据库");
} ,consumerTag-> System.out.println("取消消费"));
} catch (IOException e) {
e.printStackTrace();
}
运行消费者代码再次访问web控制界面看到其中pubsub_queue1还消息已经被消费
路由模式路由模式生产者指定路由发送数据,消费者绑定路由接受数据。与发布/订阅模式不同的是,发布/订阅模式只要是绑定了交换机的队列都会收到生产者向交换机推送过来的数据。而路由模式下加了一个路由设置,生产者向交换机发送数据时,会声明发送给交换机下的那个路由,并且只有当消费者的队列绑定了交换机并且声明了路由,才会收到数据
消息生产者Channel channel = RabbitmqUtils.getChannel();
//这些模式的编码都没有很大的差别,只是在原有的基础上去添加配置代码
try {
//创建交换机 为DIRECT模式
channel.exchangeDeclare("text_routing", BuiltinExchangeType.DIRECT,false,false,false,null);
//创建路由
channel.queueDeclare("routing_queue1",false,false,false,null);
channel.queueDeclare("routing_queue2",false,false,false,null);
//将交换机与队列进行绑定设置routerkey
channel.queueBind("routing_queue1","text_routing","error");
channel.queueBind("routing_queue2","text_routing","info");
channel.queueBind("routing_queue2","text_routing","error");
String msg = "路由模式!!!";
//向text_routing里发送消息routerkey为error,只有路由key为error的队列方可接收到消息
channel.basicPublish("text_routing","error",null,msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
RabbitmqUtils.closeConnection();
}
查看界面消息
消息消费者Channel channel = RabbitmqUtils.getChannel();
try {
//路由key error绑定的是routing_queue1 和 routing_queue2我们访问任一个都可以接收到消息
//但当routerkey为info时在访问queue1时就接收不到数据了,业务队列1绑定的仅有error
channel.basicConsume("routing_queue1",true,(s, delivery) ->{
System.out.println( new String(delivery.getBody())+"存到数据库");
} ,consumerTag-> System.out.println("取消消费"));
} catch (IOException e) {
e.printStackTrace();
}
主题模式
主题模式可以简单的理解为可以动态路由,*代表一个单词,#可以代替零个或多个单词,单词最多 255 个字节,通过相关的匹配规则后就会将满足条件的消息放到对应的队列中,每个单词之间要用点隔开
生产者 Channel channel = RabbitmqUtils.getChannel();
try {
channel.exchangeDeclare("text_topic", BuiltinExchangeType.TOPIC,false,false,false,null);
channel.queueDeclare("topic_queue1",false,false,false,null);
channel.queueDeclare("topic_queue2",false,false,false,null);
//使用通配符去匹配routerkey
channel.queueBind("topic_queue1","text_topic","#.error");
channel.queueBind("topic_queue2","text_topic","*.*");
String msg = "topic主题订阅模式!!!";
//使用order.info 可以发布到queue2里
channel.basicPublish("text_topic","order.info",null,msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
RabbitmqUtils.closeConnection();
}
运行代码访问web界面
访问queue1队列没有消息,访问queue2队列会 获得刚刚发送的消息



