栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

RabbitMQ 3.9( 续 )

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ 3.9( 续 )

 优质资源分享 
学习路线指引(点击解锁)知识定位人群定位
李 Python实战微信订餐小程序 李进阶级本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
Python量化交易实战入门级手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

目录

前言
  • 基础篇链接:https://blog.csdn.net/xiegongzi/p/16229678.html
3.9、延迟队列 - 重要 3.9.1、延迟队列概念
  • 这个玩意儿要表达的意思其实已经见过了,就是死信队列中说的TTL消息过期,但是文字表达得换一下

  • 所谓的延迟队列:就是用来存放需要在指定时间内被处理的元素的队列,其内部是有序的

  • 使用场景:

    • 1、支付时,订单在30分钟以内未支付则自动取消支付
    • 2、退款,用户发起退款,在3天以后商家还未处理,那官方便介入其中进行处理
  • 玩延迟队列需要具备的条件:

    • 1、具备死信队列知识
    • 2、具备TTL知识
    • 然后将这二者结合,加一些东西,上好的烹饪就做好了
  • 实现如下的逻辑

  • P:生产者
  • X:正常交换机
  • Y:死信交换机
  • QA、QB:正常队列
  • QD:死信队列
  • XA、XB:正常交换机、正常队列的routing key
  • YD:死信交换机、死信队列的routing key
3.9.2、集成SpringBoot 3.9.2.1、依赖

|  |  |
|  |  |
|  |  |
|  | org.springframework.bootgroupId> |
|  | spring-boot-starter-amqpartifactId> |
|  | dependency> |
|  |  |
|  | org.springframework.amqpgroupId> |
|  | spring-rabbit-testartifactId> |
|  | testscope> |
|  | dependency> |
|  |  |
|  | org.springframework.bootgroupId> |
|  | spring-boot-starter-webartifactId> |
|  | dependency> |
|  |  |
|  | org.springframework.bootgroupId> |
|  | spring-boot-starter-testartifactId> |
|  | testscope> |
|  | dependency> |
|  | dependencies> |
|  |  |


3.9.2.2、yml文件配置

|  | # RabbitMQ的配置 |
|  | spring: |
|  | rabbitmq: |
|  | host: 自己服务器ip |
|  | port: 5672 |
|  | username: admin |
|  | password: admin |
|  | # 要是有Vhost也可以进行配置 |
|  |  |


3.9.2.4、RabbitMQ配置


|  | package cn.zixieqing.config; |
|  |  |
|  | import org.springframework.amqp.core.*; |
|  | import org.springframework.beans.factory.annotation.Qualifier; |
|  | import org.springframework.context.annotation.Bean; |
|  | import org.springframework.context.annotation.Configuration; |
|  |  |
|  | import java.util.HashMap; |
|  |  |
|  |  |
|  | @Configuration |
|  | public class MqConfig { |
|  |  |
|  |  |
|  | private static final String TTL_NORMAL_EXCHANGE = "X"; |
|  |  |
|  |  |
|  | private static final String TTL_DEAD_LETTER_EXCHANGE = "Y"; |
|  |  |
|  |  |
|  | private static final String TTL_NORMAL_QUEUE_A = "QA"; |
|  | private static final String TTL_NORMAL_QUEUE_B = "QB"; |
|  |  |
|  |  |
|  | private static final String TTL_DEAD_LETTER_QUEUE_D = "QD"; |
|  |  |
|  |  |
|  | private static final String TTL_NORMAL_EXCHANGE_BIND_QUEUE_A = "XA"; |
|  |  |
|  |  |
|  | private static final String TTL_NORMAL_EXCHANGE_BIND_QUEUE_B = "XB"; |
|  |  |
|  |  |
|  | private static final String TTL_NORMAL_QUEUE_AND_DEAD_LETTER_EXCHANGE_AND_DEAD_LETTER_QUEUE_BIND = "YD"; |
|  |  |
|  |  |
|  |  |
|  | @Bean("xExchange") |
|  | public DirectExchange xExchange() { |
|  | // 直接创建是什么类型的交换机 加上 交换机名字就可以了 |
|  | return new DirectExchange(TTL_NORMAL_EXCHANGE); |
|  |  } |
|  |  |
|  |  |
|  | @Bean("yExchange") |
|  | public DirectExchange yExchange() { |
|  | return new DirectExchange(TTL_DEAD_LETTER_EXCHANGE); |
|  |  } |
|  |  |
|  |  |
|  | @Bean("queueA") |
|  | public Queue queueA() { |
|  |  |
|  | // initialCapacity 这里的map大小值:(存的元素个数 / 负载因子0.75) + 1 |
|  |  HashMap params = new HashMap<>(5); |
|  |  params.put("x-dead-letter-exchange", TTL_DEAD_LETTER_EXCHANGE); |
|  |  params.put("x-dead-letter-routing-key", TTL_NORMAL_QUEUE_AND_DEAD_LETTER_EXCHANGE_AND_DEAD_LETTER_QUEUE_BIND); |
|  |  params.put("x-message-ttl", 10 * 1000); |
|  |  |
|  | // 构建队列 并 传入相应的参数 |
|  | return QueueBuilder.durable(TTL_NORMAL_QUEUE_A) |
|  |  .withArguments(params) |
|  |  .build(); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Binding xChangeBindingQueueA(@Qualifier("queueA") Queue queueA, |
|  | @Qualifier("xExchange") DirectExchange xExchange) { |
|  | return BindingBuilder.bind(queueA) |
|  |  .to(xExchange) |
|  |  .with(TTL_NORMAL_EXCHANGE_BIND_QUEUE_A); |
|  |  } |
|  |  |
|  |  |
|  | @Bean("queueB") |
|  | public Queue queueB() { |
|  |  |
|  |  HashMap params = new HashMap<>(5); |
|  |  params.put("x-dead-letter-exchange", TTL_DEAD_LETTER_EXCHANGE); |
|  |  params.put("x-dead-letter-routing-key", TTL_NORMAL_QUEUE_AND_DEAD_LETTER_EXCHANGE_AND_DEAD_LETTER_QUEUE_BIND); |
|  |  params.put("x-message-ttl", 40 * 1000); |
|  |  |
|  | // 构建队列 并 传入相应的参数 |
|  | return QueueBuilder.durable(TTL_NORMAL_QUEUE_B) |
|  |  .withArguments(params) |
|  |  .build(); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Binding xChangeBindingQueueB(@Qualifier("queueB") Queue queueB, |
|  | @Qualifier("xExchange") DirectExchange xExchange) { |
|  | return BindingBuilder.bind(queueB) |
|  |  .to(xExchange) |
|  |  .with(TTL_NORMAL_EXCHANGE_BIND_QUEUE_B); |
|  |  } |
|  |  |
|  |  |
|  | @Bean("queueD") |
|  | public Queue queueD() { |
|  | return new Queue(TTL_DEAD_LETTER_QUEUE_D); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Binding yExchangeBindingQueueD(@Qualifier("queueD") Queue queueD, |
|  | @Qualifier("yExchange") DirectExchange yExchange) { |
|  | return BindingBuilder.bind(queueD) |
|  |  .to(yExchange) |
|  |  .with(TTL_NORMAL_QUEUE_AND_DEAD_LETTER_EXCHANGE_AND_DEAD_LETTER_QUEUE_BIND); |
|  |  } |
|  |  |
|  | } |
|  |  |


3.9.2.5、生产者

新加一个依赖


|  |  |
|  | com.alibabagroupId> |
|  | fastjsonartifactId> |
|  | 1.2.75version> |
|  | dependency> |
|  |  |


生产者伪代码


|  | package cn.zixieqing.controller; |
|  |  |
|  | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
|  | import org.springframework.beans.factory.annotation.Autowired; |
|  | import org.springframework.web.bind.annotation.GetMapping; |
|  | import org.springframework.web.bind.annotation.PathVariable; |
|  | import org.springframework.web.bind.annotation.RequestMapping; |
|  | import org.springframework.web.bind.annotation.RestController; |
|  |  |
|  | import java.util.Date; |
|  |  |
|  |  |
|  | @RestController |
|  | @RequestMapping("sendMsg") |
|  | public class MqProducerController { |
|  |  |
|  |  |
|  | @Autowired |
|  | private RabbitTemplate rabbitTemplate; |
|  |  |
|  | @GetMapping("{message}") |
|  | public void sendMsg(@PathVariable String message) { |
|  |  |
|  |  System.out.println( new Date() + ":接收到了消息===>" + message); |
|  |  |
|  | // 发送消息 |
|  |  rabbitTemplate.convertAndSend("X","XA","这条消息是来着TTL为10s的===>" + message); |
|  |  |
|  |  rabbitTemplate.convertAndSend("X","XB","这条消息是来着TTL为40s的===>" + message); |
|  |  } |
|  | } |
|  |  |


3.9.2.6、消费者

|  | package cn.zixieqing.consumer; |
|  |  |
|  |  |
|  | import com.rabbitmq.client.Channel; |
|  | import org.springframework.amqp.core.Message; |
|  | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
|  | import org.springframework.stereotype.Component; |
|  |  |
|  | import java.nio.charset.StandardCharsets; |
|  | import java.util.Date; |
|  |  |
|  | @Component |
|  | public class DeadLetterQueueConsumer { |
|  |  |
|  | @RabbitListener(queues = "QD") |
|  | public void receiveMsg(Message message,Channel Channel) { |
|  |  System.out.println( new Date() + "接收到了消息===>" + |
|  | new String( message.getBody(), StandardCharsets.UTF_8)); |
|  |  } |
|  | } |
|  |  |


  • 但是:这种延迟队列有缺点
    • 当有很多请求,而延迟时间也都不一样时,那么就要写N多的这种代码了
3.9.3、RabbitMQ插件实现延迟队列
  • 插件下载地址:https://www.rabbitmq.com/community-plugins.html
  • github地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

  • 进入如下的目录中

|  |  cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.15/plugins # 版本号改成自己的 |
|  |  |


  • 把下载的插件上传进去

  • 启动插件

|  |  rabbitmq-plugins enable rabbitmq_delayed_message_exchange |
|  |  |


  • 重启rabbitMQ

|  |  systemctl restart rabbitmq-server |
|  |  |


  • 然后去web管理界面看exchange,就发现交换机类型多了一个

3.9.3.1、编写配置
  • 使用这种插件的方式,那么延迟设置就是在exchange交换机这一方进行设置,和以前在queue队列中进行延迟设置不一样

原来的延迟队列设置

使插件之后的延迟设置

  • 使用插件,实现下面的逻辑图


|  | package cn.zixieqing.config; |
|  |  |
|  | import org.springframework.amqp.core.Binding; |
|  | import org.springframework.amqp.core.BindingBuilder; |
|  | import org.springframework.amqp.core.CustomExchange; |
|  | import org.springframework.amqp.core.Queue; |
|  | import org.springframework.beans.factory.annotation.Qualifier; |
|  | import org.springframework.context.annotation.Bean; |
|  | import org.springframework.context.annotation.Configuration; |
|  |  |
|  | import java.util.HashMap; |
|  |  |
|  | @Configuration |
|  | public class DelayedExchanegConfig { |
|  |  |
|  |  |
|  | private static final String EXCHANGE_NAME = "delayed.exchange"; |
|  |  |
|  |  |
|  | private static final String QUEUE_NAME = "delayed.queue"; |
|  |  |
|  |  |
|  | private static final String EXCHANGE_BINDING_QUEUE_ROUTING_KEY = "delayed.routingkey"; |
|  |  |
|  |  |
|  |  |
|  | @Bean |
|  | public CustomExchange delayedExchange() { |
|  |  |
|  |  HashMap params = new HashMap<>(3); |
|  | // 延迟类型 |
|  |  params.put("x-delayed-type", "direct"); |
|  |  |
|  |  |
|  | return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, params); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Queue delayedQueue() { |
|  | return new Queue(QUEUE_NAME); |
|  |  } |
|  |  |
|  |  |
|  | public Binding exchangeBindingQueue(@Qualifier("delayedExchange") CustomExchange delayedExchange, |
|  | @Qualifier("delayedQueue") Queue delayedQueue) { |
|  |  |
|  | return BindingBuilder |
|  |  .bind(delayedQueue) |
|  |  .to(delayedExchange) |
|  |  .with(EXCHANGE_BINDING_QUEUE_ROUTING_KEY) |
|  | // noargs()就是构建的意思 和 build()一样 |
|  |  .noargs(); |
|  |  } |
|  | } |
|  |  |


3.9.3.2、生产者

|  | package cn.zixieqing.controller; |
|  |  |
|  | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
|  | import org.springframework.beans.factory.annotation.Autowired; |
|  | import org.springframework.web.bind.annotation.GetMapping; |
|  | import org.springframework.web.bind.annotation.PathVariable; |
|  | import org.springframework.web.bind.annotation.RequestMapping; |
|  | import org.springframework.web.bind.annotation.RestController; |
|  |  |
|  | import java.util.Date; |
|  |  |
|  |  |
|  | @RestController |
|  | @RequestMapping("sendMsg") |
|  | public class DelatedQueueController { |
|  |  |
|  | @Autowired |
|  | private RabbitTemplate rabbitTemplate; |
|  |  |
|  | @GetMapping("/{message}/{ttl}") |
|  | public void getMesg(@PathVariable String message, @PathVariable int ttl) { |
|  |  |
|  |  System.out.println(new Date() + "接收到了消息===>" + message + "===>失效时间为:" + ttl); |
|  |  |
|  | // 发送消息 |
|  |  rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", data->{ |
|  | // 设置失效时间 |
|  |  data.getMessageProperties().setDelay(10 * 1000); |
|  | return data; |
|  |  }); |
|  |  } |
|  | } |
|  |  |


3.9.3.3、消费者

|  | package cn.zixieqing.consumer; |
|  |  |
|  | import org.springframework.amqp.core.Message; |
|  | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
|  | import org.springframework.stereotype.Component; |
|  |  |
|  | import java.nio.charset.StandardCharsets; |
|  | import java.util.Date; |
|  |  |
|  | @Component |
|  | public class DelayedQueueConsumer { |
|  |  |
|  | @RabbitListener(queues = "delayed.queue") |
|  | public void receiveMessage(Message message) { |
|  |  System.out.println("消费者正在消费消息......"); |
|  | String msg = new String(message.getBody(), StandardCharsets.UTF_8); |
|  |  System.out.println(new Date() + "消费了消息===>" + message); |
|  |  } |
|  | } |
|  |  |


  • 发送两次消息,然后把传的TTL弄成不一样的,那么:TTL值小的消息就会先被消费,然后到了指定时间之后,TTL长的消息再消费
3.10、发布确认 - 续 3.10.1、ConfirmCallback() 和 ReturnCallback()
  • 正常的流程应该是下面的样子

  • 但是:如果交换机出问题了呢,总之就是交换机没有接收到生产者发布的消息( 如:发消息时,交换机名字搞错了 ),那消息就直接丢了吗?
  • 同理:要是队列出问题了呢,总之也就是交换机没有成功地把消息推到队列中( 如:routing key搞错了 ),咋办?
  • 而要解决这种问题,就需要使用标题中使用的两个回调,从而:让架构模式变成如下的样子

ConfirmCallback() 和 ReturnCallback()的配置

  • 在yml文件中添加如下内容

|  | spring: |
|  | rabbitmq: |
|  | # 发布确认类型 |
|  | publisher-confirm-type: correlated |
|  | # 队列未收到消息时,触发returnCallback回调 |
|  | publisher-returns: true |
|  |  |


  • 编写ConfirmCallback 和 returnCallback回调接口( 伪代码 ) - 注意点:这两个接口是RabbitTemplate的内部类( 故而:就有大文章 )

|  | @Component |
|  | public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback { |
|  |  |
|  | @Autowired |
|  | private RabbitTemplate rabbitTemplate; |
|  |  |
|  |  |
|  | @PostConstruct |
|  | public void init(){ |
|  |  rabbitTemplate.setConfirmCallback(this); |
|  |  rabbitTemplate.setReturnCallback(this); |
|  |  } |
|  |  |
|  |  |
|  | @Override |
|  | public void confirm(CorrelationData correlationData, boolean ack, String cause) { |
|  | if(ack){ |
|  |  System.out.println("消息已经送达到Exchange"); |
|  |  }else{ |
|  |  System.out.println("消息没有送达到Exchange"); |
|  |  } |
|  |  } |
|  |  |
|  |  |
|  | @Override |
|  | public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { |
|  |  System.out.println("消息没有送达到Queue"); |
|  |  } |
|  | } |
|  |  |


  • 生产者调用的方法是:rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData)
    • 多了一个CorrelationData 参数,这个参数携带的就是消息相关信息
3.11、备份交换机
  • 这个玩意儿也是为了解决前面发布确认中队列出问题的方案

  • 注意:这种方式优先级比前面的 ReturnCallback回退策略要高( 演示:跳过 - 可以采用将这二者都配置好,然后进行测试,结果是备份交换机的方式会优先执行,而前面的回退策略的方式并不会执行 )

  • 采用备份交换机时的架构图

上图架构的伪代码配置编写


|  | package cn.zixieqing.config; |
|  |  |
|  | import org.springframework.amqp.core.*; |
|  | import org.springframework.beans.factory.annotation.Qualifier; |
|  | import org.springframework.context.annotation.Bean; |
|  | import org.springframework.context.annotation.Configuration; |
|  |  |
|  | @Configuration |
|  | public class AlternateExchangeConfig { |
|  |  |
|  |  |
|  | private static final String NORMAL_EXCHANGE_NAME = "normal_exchange"; |
|  |  |
|  |  |
|  | private static final String NORMAL_QUEUE_NAME = "normal_queue"; |
|  |  |
|  |  |
|  | private static final String ALTERNATE_EXCHANGE_NAME = "alternate_exchange"; |
|  |  |
|  |  |
|  | private static final String ALTERNATE_QUEUE_NAME = "alternate_queue"; |
|  |  |
|  |  |
|  | private static final String WARNING_QUEUE_NAME = "warning_queue"; |
|  |  |
|  |  |
|  | @Bean |
|  | public DirectExchange confirmExchange() { |
|  |  |
|  | return ExchangeBuilder |
|  |  .directExchange(NORMAL_EXCHANGE_NAME) |
|  |  .durable(true) |
|  | // 绑定备份交换机 |
|  |  .withArgument("alternate-exchange", ALTERNATE_EXCHANGE_NAME) |
|  |  .build(); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Queue confirmQueue() { |
|  | return new Queue(NORMAL_QUEUE_NAME); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Binding confirmExchangeBindingConfirmQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange, |
|  | @Qualifier("confirmQueue") Queue confirmQueue) { |
|  | return BindingBuilder |
|  |  .bind(confirmQueue) |
|  |  .to(confirmExchange) |
|  |  .with("routingkey"); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public FanoutExchange alternateExchange() { |
|  | return new FanoutExchange(ALTERNATE_EXCHANGE_NAME); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Queue alternateQueue() { |
|  | return QueueBuilder |
|  |  .durable(ALTERNATE_QUEUE_NAME) |
|  |  .build(); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Queue warningQueue() { |
|  | return new Queue(WARNING_QUEUE_NAME); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Binding alternateExchangeBindingAlternateQueue(@Qualifier("alternateQueue") Queue alternateQueue, |
|  | @Qualifier("alternateExchange") FanoutExchange alternateExchange) { |
|  | return BindingBuilder |
|  |  .bind(alternateQueue) |
|  |  .to(alternateExchange); |
|  |  } |
|  |  |
|  |  |
|  | @Bean |
|  | public Binding alternateExchangeBindingWarningQueue(@Qualifier("warningQueue") Queue warningQueue, |
|  | @Qualifier("alternateExchange") FanoutExchange alternateExchange) { |
|  | return BindingBuilder |
|  |  .bind(warningQueue) |
|  |  .to(alternateExchange); |
|  |  } |
|  | } |
|  |  |


  • 后续的操作就是差不多的,生产者发送消息,消费者消费消息,然后里面再做一些业务的细节处理就可以了
3.12、优先级队列
  • 这就是为了让MQ队列中的某个 / 某些消息能够优先被消费

  • 使用场景:搞内幕,让某个人 / 某些人一定能够抢到什么商品

  • 想要实现优先级队列,需要满足如下条件:

    • 1、队列本身设置优先级( 在声明队列是进行参数配置 )

      • 
        

      | | |
      | | @Bean |
      | | public Queue alternateQueue() { |
      | | // 空间大小: ( map存储的元素个数 / 0.75 ) + 1 |
      | | HashMap params = new HashMap<>(3); |
      | | params.put(“x-max-priority”, 10); |
      | | return QueueBuilder |
      | | .durable(ALTERNATE_QUEUE_NAME).withArguments(params) |
      | | .build(); |
      | | } |
      | | |

       
    • 2、让消息有优先级

      • 
        

      | | |
      | | // 发送消息 |
      | | rabbitTemplate.convertAndSend(“normal.exchange”, “normal.routingkey”, data->{ |
      | | // 消息设置优先级 - 注意:这个数值不能比前面队列设置的那个优先级数值大,即:这里的消息优先级范围就是前面队列中设置的(0, 10) |
      | | data.getMessageProperties().setPriority(5); |
      | | return data; |
      | | }); |
      | | |

      
      
  • 注意点:设置了优先级之后,需要做到如下条件:

    • 需要让消息全部都发到队列之后,才可以进行消费,原因:消息进入了队列,是会重新根据优先级大小进行排队,从而让优先级数值越大越在前面
3.13、惰性队列
  • 这玩意儿指的就是让消息存放在磁盘中

  • 正常情况下是如下的样子

  • 但是:如果此时发送的消息是成千上万条,并且消费者出故障了( 下线、宕机、维护从而关闭 ),那么这些成千上万的消息就会堆积在MQ中,怎么办?就需要像下面这么搞

设置惰性队列的配置


|  |  |
|  |  |
|  |  Map params = new HashMap(); |
|  |  params.put("x-queue-mode", "lazy"); |
|  |  channel.queueDeclare("hello", true, false, false, params); |
|  |  |
|  |  |
|  |  |
|  | @Bean |
|  | public Queue alternateQueue() { |
|  | // 空间大小: ( map存储的元素个数 / 0.75 ) + 1 |
|  |  HashMap params = new HashMap<>(3); |
|  |  params.put("x-queue-mode", "lazy"); |
|  | return QueueBuilder |
|  |  .durable(ALQUEUE_NAME).withArguments(params) |
|  |  .build(); |
|  |  } |
|  |  |


  • 经过如上配置之后,那么内存中记录的就是指向磁盘的引用地址,而真实的数据是在磁盘中,下一次消费者恢复之后,就可以从磁盘中读取出来,然后再发给消费者( 缺点:得先读取,然后发送,这性能很慢,但是:处理场景就是消费者挂彩了,不再消费消息时存储数据的情景 )

作者:紫邪情欢迎任何形式的转载,但请务必注明出处。限于本人水平,如果文章和代码有表述不当之处,还请不吝赐教。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/884305.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号