- AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
-
流量消峰
-
应用解耦
-
异步处理
- 生产者
产生数据发送消息的程序 - 消费者
等待接收消息的程序 - 交换机
它接收来自生产者的消息,将消息推送到队列中 - 队列
内部使用的一种数据结构,生产者可以将消息发送到一个队列,消费者可以尝试从一个队列接收数据
- 官网地址 https://www.rabbitmq.com/download.html
- 工具的下载可以去下面链接去下载
链接: https://pan.baidu.com/s/1mH-3Q3_i_c8BNMaKwVgmTA 提取码: y5vu
- 由于RabbitMQ是由Erlang语言编写的所以需要先安装这个Erlang安装包。
- 先把上面下载的两个软件下载并且上传到linux的/opt目录下去,然后进行解压安装,安装的流程:
rpm -ivh erlang-21.3-1.el7.x86_64.rpm yum install socat -y # 安装rabbitmq需要先安装这个,不然安装不了 rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
- 启动服务
/sbin/service rabbitmq-server start # 启动rabbitmq命令 /sbin/service rabbitmq-server stop # 关闭rabbitmq的命令 rabbitmq-plugins enable rabbitmq_management # 开启web管理插件 # 关闭linux的防火墙 systemctl stop firewalld # 本机访问网址 192.168.26.134:17256 # 访问使用默认的账号guest,密码guest登录发现权限不够
- 创建用户
rabbitmqctl add_user admin admin # 创建账号 rabbitmqctl set_user_tags admin administrator #设置用户角色 set_permissions [-p七: Hello World!(The simplest thing that does something)] rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" #设置用户权限 rabbitmqctl list_users #查看当前用户和角色
“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列
- 通过maven引入rabbitmq的依赖
com.rabbitmq amqp-client 5.8.0
生产者:
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接rabbitmq主机
connectionFactory.setHost("192.168.26.134");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接那个虚拟主机(类似于数据库中的库)
connectionFactory.setVirtualHost("/ems");
// 设置连接放的虚拟主机的放用户名和密码
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 获取连接对象
Connection connection = connectionFactory.newConnection();
// 获取连接中的通道
Channel channel = connection.createChannel();
// 通道绑定对应消息队列
channel.queueDeclare("hello", false, false, false, null);
// 发布消息
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "helloRabbitMQ".getBytes());
channel.close();
connection.close();
}
消费者
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接rabbitmq主机
connectionFactory.setHost("192.168.26.134");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接那个虚拟主机(类似于数据库中的库)
connectionFactory.setVirtualHost("/ems");
// 设置连接放的虚拟主机的放用户名和密码
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// 获取连接对象
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("取出的消息:" + new String(body));
}
});
// channel.close();
// connection.close();
}
八:Work queues
- 简介:
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务
- 先把连接rabbitmq的一些代码先封装起来
private static ConnectionFactory connectionFactory;
// 只执行一次
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.26.134");
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setPort(5672);
}
// 定义一共连接对象的方法
public static Connection getConnection() {
Connection connection = null;
try {
connection = connectionFactory.newConnection();
return connection;
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
public static void closeConnectionAndChanel(Channel channel,Connection connection){
try {
if (channel!=null) {
channel.close();
}
if (connection!=null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
- 生产者
Channel channel = ConnectionUtils.getChannel();
// 开启发布确认
channel./confirm/iSelect(); channel.queueDeclare(ConnectionUtils.QUEUE_NAME,false,false,false,null);
String message = "121212";
channel.basicPublish("",ConnectionUtils.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
- 消费者1
Channel channel = ConnectionUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println(new String(message.getBody()));
try {
Thread.sleep(1000);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag);
};
// 设置不公平分发
// channel.basicQos(1);
channel.basicConsume(ConnectionUtils.QUEUE_NAME,false,deliverCallback,cancelCallback);
- 消费者2
Channel channel = ConnectionUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println(new String(message.getBody()));
try {
Thread.sleep(10000);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
// 设置手动应答
} catch (InterruptedException e) {
e.printStackTrace();
}
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag);
};
// 设置不公平分发
// channel.basicQos(1);
channel.basicConsume(ConnectionUtils.QUEUE_NAME,false,deliverCallback,cancelCallback);
- 默认是使用轮询的方式进行访问,当消费者1抢到第一个消息的时候,消费者2就会抢到消息,一人一条的方式进行消息的分发
// 设置不公平分发 channel.basicQos(1); # 设置值为1就是代表不公平的分发
- 自动应答和手动应答
自动应答一般用于需要进行高性能的处理能力,但是可能会有消息的丢失 手动应答可以尽量避免消息的丢失,当其中的一个消息没有被消费完成的时候还可以被其他的消费者消费 channel.basicConsume(ConnectionUtils.QUEUE_NAME,false,deliverCallback,cancelCallback); # 第二个参数需要设置为 false,代表手动确定消息 channel.basicAck(message.getEnvelope().getDeliveryTag(),false); # 代表是手动确认消息,第二个参数为false代表不适用批量处理
- RabbitMQ 的持久化
1. 队列实现持久化:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
# 第二个参数设置为true,则代表队列重启的话,该队列也不会被删除,代表的是队列的持久化
# 注意:如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
# 如果队列是持久化的话,队列哪里就会显示一个 D
2. 消息持久化:
channel.basicPublish("",ConnectionUtils.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
# 第三个参数设置为 MessageProperties.PERSISTENT_TEXT_PLAIN 代表的是消息的持久化,如果只是设置了队列的持久化没有设置消息的持久化的话,重启之后消息也还是不会存在的。
- 发布确认
简介:发布确认默认是没有开启的,如果要开启需要调用方法/confirm/iSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法
9.1 :单个确认发布
它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitFor/confirm/isOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布
//生产者开启发布确认
channel./confirm/iSelect();
for (int i = 0; i < 100; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitFor/confirm/is();
if(flag){
System.out.println("消息发送成功");
}
}
9.2 : 批量确认发布 简介:先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题
//开启发布确认
channel./confirm/iSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
for (int i = 0; i < 1000; i++)
{
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize)
{
channel.waitFor/confirm/is();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0)
{channel.waitFor/confirm/is();
}
9.3:异步确认发布
try (Channel channel = RabbitMqUtils.getChannel())
{String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel./confirm/iSelect();
ConcurrentSkipListMap outstandingConfirms = new
ConcurrentSkipListMap<>();
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap confirmed =
outstanding/confirm/is.headMap(sequenceNumber, true);
//清除该部分未确认消息
/confirm/ied.clear();
}else{
//只清除当前序列号的消息
outstanding/confirm/is.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) ->
{String message = outstanding/confirm/is.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
channel.add/confirm/iListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++)
{String message = "消息" + i;
outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());



