它是一种接收数据,接收请求,发送数据,存储数据等功能的技术服务。
消息中间件利用可靠的消息传递机制进行系统和系统之间的通讯。
通过提供消息传递和消息的排队机制,它可以在分布式系统环境下扩展进程之间的通讯
消息中间件核心组成部分
- 消息的协议
- 消息的持久化机制
- 消息的分布策略
- 消息的高可用 高可靠
- 消息的容错机制
计算机底层操作系统与应用程序通讯之间共同遵守的一组约定,只有遵守了约定,两者才能互通交流。主要目的是让客户端(Java go…)可以进行沟通和通讯,和一般的网络应用程序不同,主要负责数据的接收和传递,所以性能较高,协议对数据格式和计算机之间交换数据都必须严格遵守规范。
面试题
为什么消息中间件不采用http协议
- 大部分http请求都是短链接,在实际应用过程中,一个请求到响应的过程,中途可能会中断,中断就不会满足持久化,就会造成数据的丢失,这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对消息进行持久化,目的就是保证数据的高可靠性和稳健的运行。
- 因为http请求报文头和响应报文头过长比较复杂,包含了cookie和加密解密,状态码 响应码等附加功能,但是对于一个消息而言,我们不需要这么复杂,其实就复杂数据的分发,传递,存储就行。
AMQP是高级消息队列协议,是一个提供统一消息应用服务标准的队列协议,是应用层协议的开放标准。Erlang中的RabbitMq就是采用的AMQP协议。
特性:
- 分布式事务支持
- 消息持久化支持
- 高性能和高可靠的数据处理优势
基于TCP/IP二进制协议,消息内部是通过长度来分割,有一些基本数据类型来组成。
特性:
4. 结构简单
5. 解析速度快
6. 无事务支持
7. 有持久化设计
MQ消息队列有如下几个角色
8. 生产者
9. 存储消息
10.消费者
那么生产者在生成消息后,MQ进行存储,消费者是如何获取消息的呢,一般获取数据的方法无非就是推送和拉取两种方式,常见的就是git。而消息队列MQ是一种推送的过程,而这些推机制会使用到很多的业务场景 ,例如订餐系统会分成几个子模块来进行一个订单的支付派送到完成。
消息队列的高可用和高可靠
高可用是指产品在规定的条件和规定的时刻内处于可执行规定功能和状态的能力。
当业务量增大时,请求也过大,一台西澳西中间件的服务器会触发硬件的极限,一台中间件的服务器已经无法满足业务的需求了,所以中间件必须支持集群部署来达到目的。
- publisher:生产者
- routeringkey:路由key,根据路由key来指定给谁发送消息
- message:消息。服务与应用程序之间传送的数据,由properties和body组成,
- Cnnection: 连接,应用程序与boker的网络连接的TCP/IP连接 三次握手与四次挥手
- channel:信道,几乎所有的操作都是在channel中进行,channel是进行消息读写的通道,客户端可以建立多个channel,一个channel代表一个任务会话。
- routingKey 是一个路由规则,虚拟机可以用它来确定发送消息到哪个路由
- Exchange:交换机 接收消息,根据路由key发送消息到指定队列(不具备消息储备能力)
- Bindings: Exchange和queue的虚拟连接, binding可以保护多个连接
- Queue:消息队列,保存消息并将他们发送给消费者。
若队列没有绑定特定的交换机,则会绑定默认的一个交换机:
一.简单模式
生产者 生成消息 发送到交换机
交换机:根据消息的属性,将消息发送到队列,若没有指定交换机,则采用默认的交换机
消费者:监听这个队列,发现消息后,获取消息执行的消费逻辑。
应用场景:一对一 例如手机短信 邮件单发
public class SimpleMode {
//初始化连接对象 短连接
private Channel channel;
@Before
public void channelInit() throws IOException, TimeoutException {
//ip:port tedu/123456
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("127.0.0.1");//ip地址
factory.setPort(5672);//端口号
factory.setUsername("guest");//用户名
factory.setPassword("guest");//密码
Connection connection = factory.newConnection();
channel=connection.createChannel();
}
//测试包含3个方法
//声明组件,交换机和队列,简单模式案例,交换机使用默认交换机.队列需要声明
@Test
public void myQueueDeclare() throws IOException {
//调用channel的方法,声明队列
channel.queueDeclare(
"simple",//设置路由key
false,//boolean类型,队列是否持久化
false,//boolean类型,队列是否专属,
// 只有创建声明队列的连接没有断开,队列才可用
false,//boolean类型,队列是否自动删除.从第一个消费端监听队列开始
//计算,直到最后一个消费端断开连接,队列就会自动删除
null);//map类型,key值是固定一批属性
System.out.println("队列声明成功");
}
//发送消息到队列 生产端,永远不会吧消息直接发给队列,发给交换机
//目前可以使用7个交换机来接收消息
@Test
public void send() throws IOException {
//准备个消息 发送的是byte[]
String msg="Hello,wangchanjun";
//将消息发给(AMQP DEFAULT)交换机 名字""
channel.basicPublish(
"",//发送给的交换机的名字,默认为空
"simple",//路由key,你想让交换机把消息传递给哪个队列的名称
null,//发送消息时,携带的头,属性等.例如
// app_id content-type priority优先级
msg.getBytes()//消息体
);
}
//消费端
@Test
public void consume() throws IOException {
//消费消息
channel.basicConsume("simple", false,
new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//从消息对象中拿到信息
byte[] body = message.getBody();
System.out.println(new String(body));
//如果autoAck false说明消费完消息,需要手动确认
channel.basicAck(
message.getEnvelope().getDeliveryTag(),
false);
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
//使用while true 将线程卡死,否则看不到消息消费逻辑
while(true);
}
}
二:工作队列模式
生产者:发送消息到交换机
交换机:根据消息属性将消息发送到消息队列
消费者:多个消费者,同时监听绑定一个队列,形成争抢消息的效果
应用场景:抢红包
public class WorkMode {
private Channel channel;
@Before
public void channelInit() throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
channel=connection.createChannel();
}
@Test
public void myQueueDeclare() throws IOException {
//调用channel的方法,声明队列
channel.queueDeclare("work",false,false,false, null);
System.out.println("队列声明成功");
}
@Test
public void send() throws IOException {
//准备个消息 发送的是byte[]
String msg="Hello Wang ";
byte[] msgByte=msg.getBytes();
//将消息发给(AMQP DEFAULT)交换机 名字""
channel.basicPublish(
"",//发送给的交换机的名字
"work",//路由key,你想让交换机把消息传递给哪个队列的名称
null,//发送消息时,携带的头,属性等.例如
// app_id content-type priority优先级
msgByte//消息体
);
}
//消费端
@Test
public void consume01() throws IOException {
//消费消息
channel.basicConsume("work", false,
new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//从消息对象中拿到信息
byte[] body = message.getBody();
System.out.println("消费者01:"+new String(body));
//如果autoAck false说明消费完消息,需要手动确认
channel.basicAck(
message.getEnvelope().getDeliveryTag(),
false);
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
//使用while true 将线程卡死,否则看不到消息消费逻辑
while(true);
}
@Test
public void consume02() throws IOException {
//消费消息
channel.basicConsume("work", false,
new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//从消息对象中拿到信息
byte[] body = message.getBody();
System.out.println("消费者02:"+new String(body));
channel.basicAck(
message.getEnvelope().getDeliveryTag(),
false);
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
//使用while true 将线程卡死,否则看不到消息消费逻辑
while(true);
}
}
三:发布与订阅模式(fanout)
若一个交换机绑定多个队列,生产者通过交换机发送消息,三个队列就能同时收到消息。
生产者:发送消息到交换机
交换机:由于是发布订阅模式,会将这个消息发送同步到后端与所有后端队列进行绑定
应用场景:邮件群发,广告群发,关注up主
public class FanoutMode{
private Channel channel;
@Before
public void channelInit() throws IOException TimeOutException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(6379);
factory.setUsernane("guest");
factory.setPassword("guest");
Connection connection =factory.new Connection();
channel=connection.createChannel();
}
private static final String TYPE=”fanout“;
private static final String EX_NAME=TYPE+"_ex";
private static final String QUEUE01=TYPE+"_Q1";
private static final String QUEUE02+TYPE+"_Q2"
@Test
public void declare() throws IOException{
channel.queueDeclare(Queue,false,false,false,null);
channel.exchangeDeclare(EX_NAME,"");
channel.queueBind(QUEUE01,EX_NAME,"");
channel.queueBind(QUEUE02,EX_NAME,"");
}
public void send() throws IOException{
String msg="Hello,Wang ";
byte[] bytes=msg.getBytes();
channel.basicPublish(EX_NAME,"北京",null,bytes);
}
}
四:路由模式(direct)
通过路由key来向指定队列来发送消息
生产者:发送消息携带具体的路由key
交换机:接收路由key,判断和当前交换机绑定的后端队列哪个满足路由的匹配将消息发送给匹配到的路由
应用场景:处理一些特殊的消息逻辑,可以经过路由筛选
public class DirectMode {
//初始化连接
private Channel channel;
@Before
public void channelInit() throws IOException, TimeoutException {
//ip:port tedu/123456
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
channel=connection.createChannel();
}
//准备交换机,队列的名称属性
private static final String TYPE="direct";
private static final String EX_NAME=TYPE+"_ex";//fanout_ex
private static final String QUEUE01=TYPE+"_Q1";
private static final String QUEUE02=TYPE+"_Q2";
//声明三个绑定关系 一个交换机 2个队列
@Test
public void declare() throws IOException {
//声明队列
channel.queueDeclare(QUEUE01,false,false,false,null);
channel.queueDeclare(QUEUE02,false,false,false,null);
//只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
//声明交换机
channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
//绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
channel.queueBind(QUEUE01,EX_NAME,"北京");
channel.queueBind(QUEUE01,EX_NAME,"广州");
channel.queueBind(QUEUE02,EX_NAME,"上海");
}
@Test
public void send() throws IOException {
String msg="你好,路由模式交换机";
byte[] bytes = msg.getBytes();
channel.basicPublish(EX_NAME,"北京",null,bytes);
}
}
主题模式(Topic)
结构
交换机绑定队列,不在使用具体的路由key,可以使用符号代替路由key的规则,进行模糊匹配:
- #:表示任意多级的字符串(0 级1级或者多级)
- *:任意长度字符串,(只代表一级)
例:
wang.chang.jun.study.com
可以匹配到以下规则生成的路由:
- wang.# - wang.chen.# - wang.*.*.* - wang.*.jun.*.* - *.chang.*.study.#
应用场景:
实现多级传送路由筛选工作,记录trace过程
public class TopicMode {
//初始化连接
private Channel channel;
@Before
public void channelInit() throws IOException, TimeoutException {
//ip:port tedu/123456
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.91.151");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
channel=connection.createChannel();
}
//准备交换机,队列的名称属性
private static final String TYPE="topic";
private static final String EX_NAME=TYPE+"_ex";//fanout_ex
private static final String QUEUE01=TYPE+"_Q1";
private static final String QUEUE02=TYPE+"_Q2";
//声明三个组件 一个交换机 2个队列
@Test
public void declare() throws IOException {
//声明队列
channel.queueDeclare(QUEUE01,false,false,false,null);
channel.queueDeclare(QUEUE02,false,false,false,null);
//只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
//声明交换机
channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
//绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
channel.queueBind(QUEUE01,EX_NAME,"中国.北京.#");
channel.queueBind(QUEUE01,EX_NAME,"中国.*.*.*.*");
channel.queueBind(QUEUE02,EX_NAME,"*.上海.#");
}
@Test
public void send() throws IOException {
String msg="你好,路由模式交换机";
byte[] bytes = msg.getBytes();
channel.basicPublish(EX_NAME,"中国.北京.大兴.亦庄",null,bytes);
}
}



