在计算机科学中,消息队列((英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。
1.1.实现消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。目前,有很多消息队列有很多开源的实现,包括JBoss Messaging.JORAM、Apache ActiveMQ、Sun0pen Message Queue、IBM MQ、Apache Qpid和HTTPSQS。当前使用较多的消息队列有RabbitMQ、RocketNQ、ActiveMQ、Kafka、ZeroNQ、metaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。 1.2.特点
MQ是消费者-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
注意:
- AMQP,即DAdvanced MessageQueuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。JMS,Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API,
用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。常见的消息队列,大部分都实现了JMSAPI,如
ActiveMQ ,Redis以及 RabbitMQ等。
优点
应用耦合、异步处理、流量削锋
解耦
传统模式:
传统模式的缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
中间件模式:
中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
。异步
传统模式:
传统模式的缺点:
—些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
削峰
传统模式:
传统模式的缺点:
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
中间件模式:
中间件模式的的优点:
系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
缺点
系统可用性降低、系统复杂性增加
消息队列,是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlangi语言编写,支持多种客户端,如: Python、
Ruby、 .NET,Java,JMS、C,PHP,Actionscript, XMPP,STONP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
总结如下:
基于AMQP协议高并发(是一个容量的概念,服务器可以接受的最大任务数量)。高性能(是一个速度的概念,单位时间内服务器可以处理的任务数)高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)。强大的社区支持,以及很多公司都在使用支持插件支持多语言 二、概念
RabbitMQ简介:RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout, topic, 和headers(headers和direct交换器完全一致,但性能差很多,目前几乎用不到),不同类型的Exchange转发消息的策略有所区别。
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。
Connection:网络连接,比如一个TCP连接。
Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP 连接。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加
密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Broker:表示消息队列服务器实体
三、安装- 获取镜像
#指定版本,该版本包含了web控制页面 docker pull rabbitmq:management
- 运行镜像
#方式一:默认guest 用户,密码也是 guest docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management #方式二:设置用户名和密码 docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management #方式三 docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management # 4369, 25672 (Erlang发现&集群端口) # 5672, 5671 (AMQP端口) 15672 (web管理后台端口) # 61613, 61614 (STOMP协议端口) # 1883, 8883 (MQTT协议端口) # https://www.rabbitmq.com/networking.html
- 访问ui界面
http://localhost:15672/四、SpringBoot整合RabbitMQ 4.1. 引入依赖
4.2. application.yml配置org.springframework.boot spring-boot-starter-amqp
spring:
rabbitmq:
host: 192.168.10.123
port: 5672
virtual-host: /
#开启发送端确认
#publisher-/confirm/is: true
publisher-/confirm/i-type: correlated
# 开启发送端消息抵达队列的确认
publisher-returns: true
template:
#只要抵达队列,以异步发送优先调用我们这个return - /confirm/i
mandatory: true
listener:
simple:
#手动ack消息(手动确认消息是否消费)
acknowledge-mode: manual
4.3. RabbitConfig配置类
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
//设置确认回调
rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause) -> {
System.out.println("/confirm/i...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
4.4. RabbitMQConfig(容器中创建交换机、队列和绑定)
package com.lyh.mall.order.config;
import com.lyh.mall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.HashMap;
@Configuration
public class MyRabbitMQConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue orderDelayQueue() {
HashMap arguments = new HashMap<>();
//死信路由
arguments.put("x-dead-letter-exchange", "order-event-exchange");
//死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
//创建队列
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
@Bean
public Queue orderReleaseQueue() {
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
@Bean
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Binding orderCreateBinding() {
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
@Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
@Bean
public Binding orderReleaseOtherBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
@Bean
public Queue orderSecKillOrrderQueue() {
Queue queue = new Queue("order.seckill.order.queue", true, false, false);
return queue;
}
@Bean
public Binding orderSecKillOrrderQueueBinding() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map arguments
Binding binding = new Binding(
"order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
null);
return binding;
}
}
4.5. 测试代码
AmqpAdmin:管理组件RabbitTemplate:消息发送处理组件@RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)Object content, Message message,Channel channel
import com.lyh.mall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@SpringBootTest
class MallOrderApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void sendMessige(){
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("你好!");
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
System.out.println("发送消息成功!!");
}
@Test
void createExchange() {
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
System.out.println("创建成功");
}
@Test
void createQueue() {
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
System.out.println("创建成功");
}
@Test
void createBinding() {
//String destination【目的地】,
// DestinationType destinationType【目的地类型】,
// String exchange【交换机】,
// String routingKey【路由键】,
//@Nullable Map arguments【自定义参数】
Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
amqpAdmin.declareBinding(binding);
System.out.println("创建成功");
}
}
//service层加监听注解获取 消息数据
@RabbitListener(queues = {"hello-java-queue"})
//这个类的这个方法才能接受hello-java-queue消息
//@RabbitHandler //类上加注解@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {
//拿到消息体
// byte[] body = message.getBody();
//拿到消息头
// MessageProperties properties = message.getMessageProperties();
System.out.println("接收到消息:" + content);
//消息处理完 手动确认 deliveryTag在Channel内按顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag->" + deliveryTag);
try {
if (deliveryTag % 2 == 0) {
//确认签收 队列删除该消息 false非批量模式
channel.basicAck(deliveryTag, false);
} else {
//拒收退货 第三个参数 -> true:重新入队 false:丢弃
channel.basicNack(deliveryTag, false, true);
}
} catch (IOException e) {
//网络中断
}
}
// @RabbitHandler
//public void receiveMessage2(OrderEntity content) {
// System.out.println("接收到消息:" + content);
//}



