1.RabbitMQ工作原理
1.1 术语
Broker:消息队列服务进程,此进程包括Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方
Producer:消息生产者,生产者通过通道将消息发送Broker
Consumer:消息消费者,接收消息队列转发的消息
1.2 发布流程
i.生产者和broker建立tcp连接
ii.生产者和broker建立通道
iii.生产者通过通道将消息发送给broker,由Exchange(消息交换机)将消息进行转发到指定的queue(队列)
1.3 接收流程
i.消费者和broker建立tcp连接
ii.消费者和broker建立通道
iii.消费者监听指定的queue(队列),当有消息到达queue时,broker默认将消息推送给消费者,消费者接收消息
2.添加依赖
com.rabbitmq amqp-client5.14.0
3.生产者代码
package com.ilearn.prducer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerMsg {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置连接的虚拟机(这里连接默认的虚拟机)
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//生产者建立tcp连接
connection = connectionFactory.newConnection();
//创建exchange通道(每个连接可以创建多个通道,一个通道代表一个会话)
channel = connection.createChannel();
//声明队列(参数1:队列名字,参数2:队列中的消息是否持久化,参数3:队列是否独占此连接,参数4:队列不在使用时是否删除此队列,参数5:队列额外参数设定)
channel.queueDeclare("test-queue", true, false, false,null);
//发布消息(参数1:交换机,如不指定会使用默认交换机,参数2:消息的路由key,是用于Exchange(交换机)将消息转发到指定的消息队列,参数3:消息的属性信息,参数4:消息内容)
channel.basicPublish("","test-queue",null,"我是一条测试消息".getBytes());
System.out.println("消息发送完成!!!");
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.close();
connection.close();
}
}
}
4.生产者执行结果
5.消费者代码
package com.ilearn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerMsg {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
//创建连接
connection = connectionFactory.newConnection();
//创建通道
channel = connection.createChannel();
//声明队列
channel.queueDeclare("test-queue",true,false,false,null);
//创建监听
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机名字
String exchange = envelope.getExchange();
//路由key
String routeKey = envelope.getRoutingKey();
//消息id
long msgId = envelope.getDeliveryTag();
//消息内容
String msg = new String(body,"utf-8");
System.out.println("交换机" + "t" + "路由key" + "t" + "消息id" + "t" + "消息内容");
System.out.println(exchange + "t" + routeKey + "t" + msgId + "t" + msg);
}
};
//监听队列(参数1:监听的队列,参数2:是否自动给消息队列发送消息回执,参数3:消息消息的方法)
channel.basicConsume("test-queue",true,defaultConsumer);
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.close();
connection.close();
}
}
}
6.消费者执行结果
7.RabbitMQ六种工作模式
7.1 简单队列模式
只包含一个生产者一个一个消费者,生产者将消息发送给队列中,消费者从队列中消费消息。单生产单消费。
代码如上所示
7.2 工作队列模式-轮询分发
多个消费者绑定在一个队列上,一条信息只能被一个消费者消费,消费者轮询消费队列中的消息。 注:这种模式就是简单队列模式情况下,起两个消费者就可以了。 7.3 工作队列模式-公平分发 多个消费者绑定在一个队列上,一条信息只能被一个消费者消费,消费者效率高的消费更多的消息。
代码(只需要调整消费者代码即可):
a)手动设置消息被消费了
handleDelivery方法增加如下代码:
channel.basicAck(deliveryTag,false);
b)消费者消息应答模式修改为手动回复
channel.basicConsume("test-queue",false,defaultConsumer);
c) 创建通道以后,设置消费者每次接收的消息数
channel.Qos(n);//n为数值



