栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot使用RabbitMQ延时队列(小白必备)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot使用RabbitMQ延时队列(小白必备)

1.什么是MQ

MQ,是一种跨进程的通信机制,用于上下游传递消息。

在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。

使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

为什么会产生消息列队?

  1. 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
  2. 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

延时列队的使用场景?

  1. 订单业务:在淘宝或者京东购买东西,用户下单后未付款则30分钟后取消订单。
  2. 短信通知:手机用户交完话费后,几分钟之内将会收到缴费信息

2.什么是RabbitMQ(这里就做了一下简单介绍)

RabbitMQ是一种消息队列 ,用于常见的进程通信。支持点对点,请求应答和发布订阅模式 并且提供多种语言的支持。常见的java,c#,php都支持。

常被用在异步处理,应用解耦。流量消锋等复杂的业务场景中。和java的kafka一样都属于消息中间件。

下载地址:

https://www.rabbitmq.com/download.html

进入RabbitMQ官网

1.第一步

第二步

下载好后不要着急安装RabbitMQ,我们这里还需要安装Erlang

下载地址:http://www.erlang.org/download/otp_win64_17.3.exe

安装步骤

步骤一

步骤二

步骤三

步骤四

安装完成

现在安装RabbitMQ

步骤一

步骤二

步骤三

安装完成

启动RabbitMQ管理工具

开始菜单 — 最新添加 — 展开 — 选中双击

输入命令:rabbitmq-plugins enable rabbitmq_management

效果如果图

在浏览器中输入地址查看:http://127.0.0.1:15672/


出现次页面代表成功,默认用户和密码都是guest/ guest

若不出现此页面,就是安装失败了,不要慌,多半问题在系统用户名必须是中文(放心有解决办法):

Windows下安装RabbitMQ后,按正常RabbitMQ会自动注册服务并自动启动,但是如果有的道友不注意中英文目录就会出现服务启动后几秒钟自动停止,而且反反复复。

出现这种情况一般都是由我们的用户名是中文,而导致默认的DB和log访问出现问。所以我建议以后大家在使用windows操作系统的时候尽量用英文来命名文件或目录,这样会极大的减小以后安装软件出现莫名其妙的问题的bug。

接下来我们先卸载我们的RabbitMQ,然后在我们的系统变量里设置一个RABBITMQ_base 的变量路径为一个不含英文的路径 比如 E:rabbit,最后我们重新安装RabbitMQ即可,然后就会看到RabbitMQ服务自动注册了,并且不会自动停止。

SpringBoot整合RabbitMQ

1.添加依赖

pom.xml中添加 spring-boot-starter-amqp的依赖

 
      
   org.springframework.boot
 spring-boot-starter-amqp
 

其他依赖


 org.springframework.boot
 spring-boot-starter-web
      
  
      
 org.projectlombok
 lombok
 true
      
  
      
 org.springframework.boot
 spring-boot-starter-test
 test
 
   
     org.junit.vintage
     junit-vintage-engine
   
 
      
      
      
 junit
 junit
 4.12
 test
      

application.yml文件中配置rabbitmq相关内容

spring:
  rabbitmq:
   host: localhost
   port: 5672
   username: guest
   password: guest

这里我们环境就搭建起来了

2.具体编码实现

配置列队

 package com.example.spring_boot_rabbitmq;
  
  
  
  import lombok.extern.slf4j.Slf4j;
  import org.springframework.amqp.core.*;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
  
  import java.util.HashMap;
  import java.util.Map;
  
  
  
  @Configuration
  @Slf4j
  public class DelayRabbitConfig {
  
    
    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
    
    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
    
    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
  
    public static final String ORDER_QUEUE_NAME = "user.order.queue";
    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
    public static final String ORDER_ROUTING_KEY = "order";
  
    
    @Bean
    public Queue delayOrderQueue() {
      Map params = new HashMap<>();
      // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
      params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
      // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
      params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
      return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
    }
    
    @Bean
    public DirectExchange orderDelayExchange() {
      return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }
    @Bean
    public Binding dlxBinding() {
      return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
    }
  
    @Bean
    public Queue orderQueue() {
      return new Queue(ORDER_QUEUE_NAME, true);
    }
    
    @Bean
    public TopicExchange orderTopicExchange() {
      return new TopicExchange(ORDER_EXCHANGE_NAME);
    }
  
    @Bean
    public Binding orderBinding() {
      // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
      return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
    }
  
  }

创建一个Order实体类

package com.example.spring_boot_rabbitmq.pojo;
 
 import lombok.Data;
 
 import java.io.Serializable;
 
 
 @Data
 public class Order implements Serializable {
   private static final long serialVersionUID = -2221214252163879885L;
 
   private String orderId; // 订单id
 
   private Integer orderStatus; // 订单状态 0:未支付,1:已支付,2:订单已取消
 
   private String orderName; // 订单名字
 
 }

接收者

package com.example.spring_boot_rabbitmq;
 
 import com.example.spring_boot_rabbitmq.pojo.Order;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
 
 
 
 @Component
 @Slf4j
 public class DelayReceiver {
   @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
   public void orderDelayQueue(Order order, Message message, Channel channel) {
     log.info("###########################################");
     log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]", new Date(), order.toString());
     if(order.getOrderStatus() == 0) {
order.setOrderStatus(2);
log.info("【该订单未支付,取消订单】" + order.toString());
     } else if(order.getOrderStatus() == 1) {
log.info("【该订单已完成支付】");
     } else if(order.getOrderStatus() == 2) {
log.info("【该订单已取消】");
     }
     log.info("###########################################");
   }
 
 }

发送者

 package com.example.spring_boot_rabbitmq;
 
 
 import com.example.spring_boot_rabbitmq.pojo.Order;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
 
 
 @Component
 @Slf4j
 public class DelaySender {
   @Autowired
   private AmqpTemplate amqpTemplate;
 
   public void sendDelay(Order order) {
     log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() );
     this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
// 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
return message;
     });
   }
 
 }

测试,访问http://localhost:8080/sendDelay查看日志输出

package com.example.spring_boot_rabbitmq;

import com.example.spring_boot_rabbitmq.pojo.Order;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;




@RestController
public class TestController {
  @Autowired
  private DelaySender delaySender;

  @GetMapping("/sendDelay")
  public Object sendDelay() {
    Order order1 = new Order();
    order1.setOrderStatus(0);
    order1.setOrderId("123456");
    order1.setOrderName("小米6");

    Order order2 = new Order();
    order2.setOrderStatus(1);
    order2.setOrderId("456789");
    order2.setOrderName("小米8");

    delaySender.sendDelay(order1);
    delaySender.sendDelay(order2);
    return "ok";
  }

}

输出


到此已经SpringBoot使用RabbitMQ延时队列已经完成,希望对你有所帮助,若有地方不理解或者有更好的办法请留言,谢谢。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/135745.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号