栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ笔记

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ笔记

MQ
  • 消息队列:
    • 一种进程通信或同一进程的不同线程的通信方式。
    • 采用链表结构实现,拥有权限的进程向消息队列写入或读取消息。
    • 常用MQ:RabbitMQ、ActiveMQ、kafka,Redis的Lis。
    • AMQP协议:应用层高级消息队列协议,面向消息,队列,路由(点对点和发布/订阅),可靠性,安全。
    • 消息的发送者与接收者不需要同时与消息队列交换数据。
  • 优点:
    • 解耦,将新增的系统写入消息队列,只需要的系统去消息队列中订阅,其他系统不做改变。
    • 异步,一些不需要即时处理的任务,写入消息队列,由其他处理任务的系统调用,不占用主业务。
    • 销峰,大量并发请求写入消息队列,系统根据服务器压力缓慢处理。排队。
RabbitMQ
  • 用于分布式系统中存储转发消息
  • 基于AMQP协议,高并发、高性能、高可用
  • 服务端Erlang实现
基础消息队列
  1. 创建连接工厂,设置连接参数ConnectionFactory

  2. 创建连接对象Connection

  3. 创建通道Channel

  4. 创建队列QueueDeclare(队列名字,持久化,是否排他队列,是否自动删除,自定义参数信息)

  5. 发送消息到指定队列basicPublish(交换机,路由键,自定义参数,数据的字节数组)

  6. 创建消费者
    - DefaultConsumer(Channel),重写handleDelivery(消费者标签,信封,参数值,接收消息内容)方法
    - DeliverCallback=(consumerTag,delivery)->{}函数式接口,

  7. 监听指定队列
    - basicConsume(队列名,自动Ack,消费者)
    - basicConsume(队列名,自动Ack,DeliverCallback,consumerTag->{})

排他队列:该队列仅对首次声明它的连接可见,连接断开时自动删除。

  • 基于连接可见,同一个连接的不同通道可以同时访问这个队列
  • 首次,其他连接不允许建立同名的排他队列。
  • 即使队列持久化的,一旦连接关闭,排他队列都会被删除。
工作队列的轮询&公平分发(适用生成环境)

轮询:从1到N循环分配消息。
公平分发:性能强的机器多分发消息,弱的少分发。

  • 设置通道接收消息的数量basicQos小于等于1
  • 当任务执行完毕后返回手动回执ACK,basicAck(信封中标签)。
发布订阅模式
  • 现将消息投递到Exchange交换机,经过交换机在路由到相关队列。
  • 多个消费者属于自己的队列
fanout广播模式

队列只要绑定交换机,交换机就可以将消息路由到所有队列。

  1. 通过通道Channel创建交换机exchangeDeclare(交换机名字,BuiltinExchangeType.FANOUT)
  2. 发送消息到指定队列basicPublish(交换机,路由键,自定义参数,数据的字节数组)
  3. 消费者自己创建队列 绑定交换机queueBind(队列名字,交换机名字,路由键)
routing路由模式
  • 消息类型Direct类型
  • 通过定义的Routing key路由键被路由到指定队列。
Topics主题模式
  • routing key . 表示分隔字符串
  • * 表示匹配一个单词 #表示匹配多个大于0
RPC远程服务调用

MQ实现RPC机制:同步操作,需要等待

  1. 客户端发起请求,在消息属性(AMQP中定义的属性)中设置两个值replyTo(Queue名称)和correlationId(请求Id)发送到服务端
  2. 服务端收到消息进行处理。
  3. 处理完后生成一条应答消息到replyTo指定的队列中,同时携带correlationId属性。
  4. 客户端之前已经订阅了replyTo指定的Queue,从收到应答消息中,根据correlationId分析那条结果被执行了。
事务控制
  1. 通过AMQP事务机制实现。
    try{
    	// 开启事务
    	channel.txSelect()
    	......
    	//提交事务
    	channel.txCommit();
    }catch(Exception e){
    	//发生异常回滚
    	channel.txRollback();
    }
    
  2. 通过Channel设置/confirm/i模式实现。
    • 同步
      // 开启/confirm/i确认模式
      channel./confirm/iSelect()
      ......
      //同步操作-----普通/confirm/i模式,每发一条消息,调用一次waitFor/confirm/is,等待服务端/confirm/i
      channel.waitFor/confirm/is();
      //同步操作-----批量/confirm/i模式,每发一批消息,调用一次waitFor/confirm/isOrDie,等待服务端/confirm/i
      channel.waitFor/confirm/isOrDie();
      
    • 异步
      // 开启/confirm/i确认模式
      channel./confirm/iSelect()
      //构建一个维护信息发送回执序列号的线程安全有序集合
      final SortedSet /confirm/iSet=Collections.syschronizedSortedSet(new TreeSet());
      //发送消息
      	//获取未确认消息序列号,添加到SortedSet中
      	/confirm/iSet.add(channel.getNextPublishSeqNo())
      //添加channel监听,判断消息是否确认
      channel.add/confirm/iListener(new /confirm/iListener(){
      	//以确认的消息
      	handleAck() {
      		if(multiple){
      			//true确认多条
      			//清除前deliveryTag标识Id
      			/confirm/iSet.headSet(deliveryTag+1L).clear();
      		}else{
      			//false确认单条
      			/confirm/iSet.remove(deliveryTag);
      		}
      	}
      	//未确认的消息
      	handleNack() {
      		if(multiple){
      			//true未确认多条
      			//清除前deliveryTag标识Id
      			/confirm/iNackSet.headSet(deliveryTag+1L).clear();
      		}else{
      			//false未确认单条
      			/confirm/iNackSet.remove(deliveryTag);
      		}
      	}
      
      
      
      })
      
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/295081.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号