- 消息队列:
- 一种进程通信或同一进程的不同线程的通信方式。
- 采用链表结构实现,拥有权限的进程向消息队列写入或读取消息。
- 常用MQ:RabbitMQ、ActiveMQ、kafka,Redis的Lis。
- AMQP协议:应用层高级消息队列协议,面向消息,队列,路由(点对点和发布/订阅),可靠性,安全。
- 消息的发送者与接收者不需要同时与消息队列交换数据。
- 优点:
- 解耦,将新增的系统写入消息队列,只需要的系统去消息队列中订阅,其他系统不做改变。
- 异步,一些不需要即时处理的任务,写入消息队列,由其他处理任务的系统调用,不占用主业务。
- 销峰,大量并发请求写入消息队列,系统根据服务器压力缓慢处理。排队。
- 用于分布式系统中存储转发消息
- 基于AMQP协议,高并发、高性能、高可用
- 服务端Erlang实现
-
创建连接工厂,设置连接参数ConnectionFactory
-
创建连接对象Connection
-
创建通道Channel
-
创建队列QueueDeclare(队列名字,持久化,是否排他队列,是否自动删除,自定义参数信息)
-
发送消息到指定队列basicPublish(交换机,路由键,自定义参数,数据的字节数组)
-
创建消费者
- DefaultConsumer(Channel),重写handleDelivery(消费者标签,信封,参数值,接收消息内容)方法
- DeliverCallback=(consumerTag,delivery)->{}函数式接口, -
监听指定队列
- basicConsume(队列名,自动Ack,消费者)
- basicConsume(队列名,自动Ack,DeliverCallback,consumerTag->{})
排他队列:该队列仅对首次声明它的连接可见,连接断开时自动删除。
- 基于连接可见,同一个连接的不同通道可以同时访问这个队列
- 首次,其他连接不允许建立同名的排他队列。
- 即使队列持久化的,一旦连接关闭,排他队列都会被删除。
轮询:从1到N循环分配消息。
公平分发:性能强的机器多分发消息,弱的少分发。
- 设置通道接收消息的数量basicQos小于等于1
- 当任务执行完毕后返回手动回执ACK,basicAck(信封中标签)。
- 现将消息投递到Exchange交换机,经过交换机在路由到相关队列。
- 多个消费者属于自己的队列
队列只要绑定交换机,交换机就可以将消息路由到所有队列。
- 通过通道Channel创建交换机exchangeDeclare(交换机名字,BuiltinExchangeType.FANOUT)
- 发送消息到指定队列basicPublish(交换机,路由键,自定义参数,数据的字节数组)
- 消费者自己创建队列 绑定交换机queueBind(队列名字,交换机名字,路由键)
- 消息类型Direct类型
- 通过定义的Routing key路由键被路由到指定队列。
- routing key . 表示分隔字符串
- * 表示匹配一个单词 #表示匹配多个大于0
MQ实现RPC机制:同步操作,需要等待
- 客户端发起请求,在消息属性(AMQP中定义的属性)中设置两个值replyTo(Queue名称)和correlationId(请求Id)发送到服务端
- 服务端收到消息进行处理。
- 处理完后生成一条应答消息到replyTo指定的队列中,同时携带correlationId属性。
- 客户端之前已经订阅了replyTo指定的Queue,从收到应答消息中,根据correlationId分析那条结果被执行了。
- 通过AMQP事务机制实现。
try{ // 开启事务 channel.txSelect() ...... //提交事务 channel.txCommit(); }catch(Exception e){ //发生异常回滚 channel.txRollback(); } - 通过Channel设置/confirm/i模式实现。
- 同步
// 开启/confirm/i确认模式 channel./confirm/iSelect() ...... //同步操作-----普通/confirm/i模式,每发一条消息,调用一次waitFor/confirm/is,等待服务端/confirm/i channel.waitFor/confirm/is(); //同步操作-----批量/confirm/i模式,每发一批消息,调用一次waitFor/confirm/isOrDie,等待服务端/confirm/i channel.waitFor/confirm/isOrDie();
- 异步
// 开启/confirm/i确认模式 channel./confirm/iSelect() //构建一个维护信息发送回执序列号的线程安全有序集合 final SortedSet
/confirm/iSet=Collections.syschronizedSortedSet(new TreeSet ()); //发送消息 //获取未确认消息序列号,添加到SortedSet中 /confirm/iSet.add(channel.getNextPublishSeqNo()) //添加channel监听,判断消息是否确认 channel.add/confirm/iListener(new /confirm/iListener(){ //以确认的消息 handleAck() { if(multiple){ //true确认多条 //清除前deliveryTag标识Id /confirm/iSet.headSet(deliveryTag+1L).clear(); }else{ //false确认单条 /confirm/iSet.remove(deliveryTag); } } //未确认的消息 handleNack() { if(multiple){ //true未确认多条 //清除前deliveryTag标识Id /confirm/iNackSet.headSet(deliveryTag+1L).clear(); }else{ //false未确认单条 /confirm/iNackSet.remove(deliveryTag); } } })
- 同步



