安装教程:https://blog.csdn.net/weixin_42673046/article/details/118442323
启动等命令
进入到rabbitmq文件目录下sbin文件夹下面执行命令
后台启动: ./rabbitmq-server -detached
查看状态:./rabbitmqctl status
应用和节点都将被关闭:./rabbitmqctl stop
应用启动:./rabbitmqctl start_app
应用关闭节点不关:./rabbitmqctl stop_app
开启管理插件web页面:./rabbitmq-plugins enable rabbitmq_management
查看插件列表状态:./rabbitmq-plugins list
springboot整合rabbitMQ的demo项目放在本地盘中
一 ,rabbitmq中消息的几种模型第一种:直连
生产者(P) —> 队列 —> 消费者(C)
生产者连接队列发布消息方法
public static void clientRb() throws IOException, TimeoutException {
//rabbitMQ连接步骤
ConnectionFactory connectionFactory = new ConnectionFactory();
//参数设置
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
//要连接的虚拟主机
connectionFactory.setVirtualHost("/dmo");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//创建连接对象
Connection connection = connectionFactory.newConnection();
//获取中间通道
//需要通过中间通道将消息发送到队列
Channel channel = connection.createChannel();
//设置消息连接信息 给通道绑定队列
//参数意思:1.队列名称(主机中没有会自动创建)
//2.是否开启持久化(保存队列信息) 此处只是队列持久化,消息不会持久化保存 不开启每次重启rabbitmq会删掉之前的队列
//3.是否独占队列(只允许当前客户端向队列发送消息)
//4.是否在消息消费完成后自动删除当前队列(消费者消费完并且断开连接才会删除)
//5.附加参数
channel.queueDeclare("helloRb",false,false,false,null);
//直连模式中其实不绑定队列也可以直接发布消息,因为下面发布消息的方法中参数路由key对应着队列名称
//直连模式下消息直接发布到AMQP default默认交换机中,默认交换机与所有队列建立隐含绑定关系,路由的key就是队列名
//所以在发布方法中直接写上队列名称也可以直接发送到队列中
//发布消息
//参数意思:1.交换机名称 2.路由key:因为默认交换机绑定了所有队列,在直连模式中可以直接送到队列中 3.发布消息的额外设置 4.发布消息集体内容
//MessageProperties.PERSISTENT_TEXT_PLAIN --开启消息持久化
channel.basicPublish("", "helloRb", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ3".getBytes());
channel.close();
connection.close();
}
消费者方法
//在main函数中运行,因为方法调用时运行结束会直接杀死线程
public static void main(String[] args) throws IOException, TimeoutException {
//rabbitMQ连接步骤
//前面的连接步骤 都一样
//参数设置
RabbitMQConnectionUtil rabbitMQConnectionUtil = new RabbitMQConnectionUtil();
Connection connection = rabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//这里的参数要与队列一致 否则会报错
//例如生产者新建的是一个消息开启持久化的队列,消费者这里绑定也要设置成持久化才能接收,否则报错
channel.queueDeclare("helloRb", true, false, false, null);
//获取/消费消息
//参数意思: 1.消息队列名称
//2.是否自动确认(此处为true) 只有确认了的消息才认为被消费完成了
//3.消费回调可以在其中获取到消息内容等操作
channel.basicConsume("helloRb", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("获取到一条消息:"+ new String(body));
}
});
//这两行会直接关掉连接收不到消息
// channel.close();
// connection.close();
}
封装的工具类
public class RabbitMQConnectionUtil {
//对于较重量级的资源放到静态代码块只创建加载一次
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/dmo");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
}
//连接方法封装
public Connection getConnection() throws IOException, TimeoutException {
try {
Connection connection = connectionFactory.newConnection();
return connection;
}catch (Exception e){
System.out.println(e.getMessage());
}
return null;
}
//关闭
public void closeConnection(Channel channel, Connection connection){
try {
if(channel != null && connection != null){
channel.close();
connection.close();
}
}catch (Exception e){
System.out.println(e.getMessage());
}
}
}
第二种:工作队列(Work Queue)
模型:
这种会有多个消费者订阅了这个消息队列,每个消费者会收到相同数量的消息,消费方式的机制为轮询,即:A第一条 B第二条 C第三条 A第四条 B第五条…这种方式来消费消息,每个消费者都会收到相同数量的消息,多余的按顺序消费。
即使某个消费者消费速度慢,执行速度慢,也会按照轮询来消费。
第二种修改:工作队列之能者多劳模式
模型与上面一样
假设有消费者A,B。消费者A执行速度很慢,B是正常速度,使用此模式B会消费大部分消息,A消费小部分
开启此模式消费者代码
1.通道设置每次只消费一个消息
2.关闭自动确认消息
3.开启手动确认消息
public static void main(String[] args) throws IOException, TimeoutException {
//rabbitMQ连接步骤
//前面的连接步骤 都一样
//参数设置
RabbitMQConnectionUtil rabbitMQConnectionUtil = new RabbitMQConnectionUtil();
Connection connection = rabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//设置每次只能消费一个消息
channel.basicQos(1);
//这里的参数要与队列一致 否则会报错
//例如生产者新建的是一个消息开启持久化的队列,消费者这里绑定也要设置成持久化才能接收,否则报错
channel.queueDeclare("workQue", true, false, false, null);
//获取/消费消息
//参数意思: 1.消息队列名称
//2.是否自动确认(此处改为false) 只有确认了的消息才认为被消费完成了
//3.消费回调可以在其中获取到消息内容等操作
channel.basicConsume("workQue", false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("获取到一条消息:"+ new String(body));
//参数意思
//1.确认标识
//2.是否开启确认多个消息,否一次一个 此处每次消费一个消息,true,false都一样
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
第三种:fanout模式/广播模式
模型:
此时生产者只能将消息发送到交换机中,由交换机来决定发送到哪个队列,每个消费者都有自己的队列(由交换机生成),每个队列都绑定了交换机,交换机会把消息发给所有的队列,实现一个消息被多个消费者消费。
生产者代码:
public static void clientRb() throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//1.交换机名称 2.交换机类型 fanout广播类型
channel.exchangeDeclare("register","fanout");
//此处就需要指定交换机名称
channel.basicPublish("register","",null,"fanout type msg".getBytes());
RabbitMQConnectionUtil.closeConnection(channel, connection);
}
消费者代码:
//在main函数中运行,因为方法调用时运行结束会直接杀死线程
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("register", "fanout");
//获取临时队列名称
//fanout类型交换机会为每个消费者生成一个对应的临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queueName,"register","");
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer_A:"+ new String(body));
}
});
//这两行会直接关掉连接收不到消息
// channel.close();
// connection.close();
}
第四种:Routing模式/路由模式
交换机模式为direct模式
模型:
此时队列与交换机之间的绑定就不是直接绑定了,而是根据路由来绑定队列,即需要指定一个RoutingKey。
生产者在发送消息到交换机中时也需要指定RoutingKey
交换机会根据RoutingKey来判断该把消息放到哪条队列中
生产者方法:
public static void clientRb() throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//1.交换机名称 2.交换机类型 direct直连类型
channel.exchangeDeclare("register_direct","direct");
String routingKey = "error";
//此处就需要指定交换机名称与路由key
channel.basicPublish("register_direct",routingKey,null,("routing msg routingKey = "+routingKey).getBytes());
RabbitMQConnectionUtil.closeConnection(channel, connection);
}
消费者A方法:
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机 以及类型
channel.exchangeDeclare("register_direct","direct");
//获取临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定临时队列以及交换机
//可以绑定多个队列
//参数 1.队列名称 2.交换机名称 3.路由key
channel.queueBind(queueName,"register_direct","error");
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerB收到消息:" + new String(body));
}
});
}
消费者B方法:
绑定了多个队列路由key
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("register_direct", "direct");
//获取临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机与临时队列
channel.queueBind(queueName,"register_direct","error");
channel.queueBind(queueName,"register_direct","info");
channel.queueBind(queueName,"register_direct","warning");
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerB收到消息:" + new String(body));
}
});
}
第五种:topic模式/主题模式/动态路由
模型:
基本与上一个模式相同,不同点就是routingKey在这种模式中可以使用通配符,可以做到一次性绑定多个队列。
通配符:
*:匹配一个单词
#:匹配多个单词
例:user.*可以匹配user.save、user.ok,但是不能匹配user.save.good
user.#可以匹配user.save.good等,不能匹配del.user
生产者方法:
private static void creatMQ() throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String routingKey = "user.save.ok";
channel.basicPublish("topics",routingKey,null,"this is topic msg".getBytes());
RabbitMQConnectionUtil.closeConnection(channel, connection);
}
消费者方法:
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("topics", "topic");
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,此处routingkey可以以通配符形式设置
// *:代表匹配一个单词 #:匹配多个单词
channel.queueBind(queueName, "topics", "user.*");
channel.basicConsume(queueName,true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerA收到:" + new String(body));
}
});
}



