栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ整合SpringBoot

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ整合SpringBoot

文章目录
  • RabbitMQ整合SpringBoot
    • 导入Maven依赖
    • 配置文件
    • 队列与死信队列
      • 通过配置文件 添加Bean创建交换机以及队列
        • config
        • controller
        • consumer
    • 手动ACK应达
      • 消费者案例
    • RabbitMQ的交换机插件
      • 使用RabbitMQ中的插件来解决以上的生产者指定过期时间的弊端
        • 插件下载以及Docker安装
          • Docker安装
      • 创建插件类型的交换机
        • config
        • controller
        • sonsumer
    • 配置回调
      • callback
    • 备份交换机
      • Config
      • controller
      • consumer
    • 优先级队列
      • config
      • controller
      • consumer

RabbitMQ整合SpringBoot

RabbitMQ官网

导入Maven依赖

  org.springframework.boot
  spring-boot-starter-amqp




  org.springframework.amqp
  spring-rabbit-test
  test

配置文件
# RabbitMQ
spring:
  rabbitmq:
    port: 5672
    addresses: 116.62.113.241
    virtual-host: /
    username: guest
    password: guest
    # 确认消息已发送大宋交换机(Exchange)选中确认模式为交互
    publisher-/confirm/i-type: correlated
    # 开启发送端消息抵达队列的确认
    publisher-returns: true
    # 只要抵达队列,以异步发送优先回调我们这个returnsfirm
    template:
      mandatory: true
    # 手动ack消息
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: auto
        # 并发消费者初始化值
        concurrency: 1
        # 并发消费者的最大值
        max-concurrency: 10
        # 每个消费者每次监听时可拉取处理的消息数量
        # 在单个请求中处理的消息个数,他应该大于等于事务数量(unAck的最大数量)
        prefetch: 1
        # 是否支持重试
        retry:
          enabled: true
  • spring.rabbitmq.publisher-/confirm/i-type
    • NONE 禁用发布确认模式,是默认值
    • CORRELATED 发布消息成功到交换器后会触发回调方法
    • SIMPLE
      • 经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法
      • 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
队列与死信队列

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间 由生产者指定时间

通过配置文件 添加Bean创建交换机以及队列 config
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;




@Configuration
public class CreateMQConfig {

    
    // 普通交换机的名称
    public static final String ORDINARY_EXCHANGE_X = "X";
    // 普通队列名称
    public static final String ORDINARY_QUEUE_A = "QA";
    public static final String ORDINARY_QUEUE_B = "QB";
    // 死信交换机的名称
    public static final String DEAD_LETTER_EXCHANGE_Y = "Y";
    // 死信队列名称
    public static final String DEAD_LETTER_QUEUE = "QD";


    
    @Bean
    public DirectExchange XExchange() {
        return new DirectExchange(ORDINARY_EXCHANGE_X);
    }

    
    @Bean
    public DirectExchange YExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE_Y);
    }


    
    @Bean
    public Queue queueA() {
        
        Map arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_Y);
        arguments.put("x-dead-letter-routing-key", "YD");
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(ORDINARY_QUEUE_A).withArguments(arguments).build();
    }

    
    @Bean
    public Queue queueB() {
        
        Map arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_Y);
        arguments.put("x-dead-letter-routing-key", "YD");
        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(ORDINARY_QUEUE_B).withArguments(arguments).build();
    }

    
    @Bean
    public Queue queueQD() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }


    
    @Bean
    public Binding queueABindingX(Queue queueA, DirectExchange XExchange) {
        return BindingBuilder.bind(queueA).to(XExchange).with("XA");
    }

    
    @Bean
    public Binding queueBBindingX(Queue queueB, DirectExchange XExchange) {
        return BindingBuilder.bind(queueB).to(XExchange).with("XB");
    }

    
    @Bean
    public Binding queueQDBindingY(Queue queueQD, DirectExchange YExchange) {
        return BindingBuilder.bind(queueQD).to(YExchange).with("YD");
    }

    // ----------增加一个队列 不设置过期时间 过期时间由生产者指定 绑定 X 交换机 通过路由 XC 到QC队列----
    
  // 普通队列名称
    public static final String ORDINARY_QUEUE_QC = "QC";

    
    @Bean
    public Queue queueQC() {
        
        Map arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_Y);
        arguments.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(ORDINARY_QUEUE_QC).withArguments(arguments).build();
    }

    
    @Bean
    public Binding queueQCBindingX(Queue queueQC, DirectExchange XExchange) {
        return BindingBuilder.bind(queueQC).to(XExchange).with("XC");
    }

}
controller

sendMessage 向X交换机发送消息 分别通过不同的路由 到指定队列中

sendMessageAssignTTL 由生产者指定消息的发送时间有弊端 因为RabbitMQ只会检查第一个消息是否过期 可以通过插件延迟队列进行弥补

import com.llc.rabbitmq.common.api.R;
import com.llc.rabbitmq.config.CreatePlugInDelayedMQConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;


@Api(tags = "RabbitMQ消息发送API")
@Slf4j
@RestController
@RequestMapping("/v1/rabbitmq")
public class SendMsgController {

    private final
    RabbitTemplate rabbitTemplate;

    public SendMsgController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @ApiOperation(value = "向RabbitMQ中发送消息")
    @ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path")
    @GetMapping("/send/{message}")
    public R sendMessage(@PathVariable String message) {
        log.info("当前时间:{}  ---  发送一条消息给两个TTL队列 | {}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10S的队列" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40S的队列" + message);
        return R.ok("消息发送成功");
    }

    
    @ApiOperation(value = "向RabbitMQ中发送消息,指定消息的过期时间TTL")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path"),
            @ApiImplicitParam(name = "ttl_time", value = "过期时间", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path")
    })
    @GetMapping("/send/{message}/{ttl_time}")
    public R sendMessageAssignTTL(@PathVariable String message, @PathVariable String ttl_time) {
        log.info("当前时间:{}  ---  发送一条消息给QC队列 | {} | 过期时间 - {}MS", new Date(), message, ttl_time);
        rabbitTemplate.convertAndSend("X", "XC", "消息自定义过期时间为" + ttl_time + "的队列" + message, (msg) -> {
            // 设置消息消费的延迟时间
            msg.getMessageProperties().setExpiration(ttl_time);
            return msg;
        });
        return R.ok("消息发送成功");
    }
}

consumer
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;


@Slf4j
@Component
public class DeadLetterQueueConsumer {

    // 指定监听的队列名
    @RabbitListener(queues = {"QD"})
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{} --- 收到死信队列的消息 | {}", new Date(), msg);

    }
}
手动ACK应达
  • 配置开启

    # 手动应答
    spring.listener.simple.acknowledge-mode=manual
    
消费者案例

在消费者中手动进行消息应答

channel.basicAck 签收

channel.basicNack 拒签

import com.llc.rabbitmq.config.PriorityMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Slf4j
@Component
public class PriorityQueueConsumer {

    @RabbitListener(queues = {PriorityMQConfig.PRIORITY_QUEUE_NAME})
    public void receivePriorityQueue(Message message, String body, Channel channel) {
        log.info("《优先级》队列消费者 message==>[{}],body==>[{}]", message, body);
        //获取交货标签 内容安顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            channel.basicAck(deliveryTag, false);
            
            if (deliveryTag % 2 == 0) {
                channel.basicNack(deliveryTag, false, true);
//                channel.basicReject(); //另外一种拒签方法
            }
        } catch (IOException e) {
            log.error("网络终端了", e);
        }
    }
}
RabbitMQ的交换机插件 使用RabbitMQ中的插件来解决以上的生产者指定过期时间的弊端

插件下载以及Docker安装

解决队列先进先出的情况,比如第一个进到队列的到期时间是30分钟,第二个进入队列的到期时间是20分钟,但是由于队列的先进先出原则,第二个进入的会被阻塞了,等到第一个到期了才会被一起延迟处理

原理:之前的的延迟是在队列中做的通过设置队列的TTL来延迟推送给消费者

​ 而现在的延迟是在交换机中做,交换机延迟推送给队列,队列接收到消息直接推送给消费者,来弥补队列做延迟的弊端

  • 插件 rabbitmq_delayed_message_exchange

    插件下载地址

Docker安装
docker run  --name rabbitmq --restart=always 
-p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 
-d rabbitmq:management
# 4369,25672 (Erlang发现&集群端口) | 5672,5671(AMQP端口) | 15672(web管理后台端口) | 61613,61614( STOMP协议端口) | 1883,8883(MQTT协议端口)
# 默认密码 guest
# 访问地址 http://127.0.0.1:15672
# 安装插件
## 将插件上传到服务器中
docker cp /dockeres/rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
## 进入容器启动插件
docker exec -it rabbitmq bash
## 启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
## 禁用插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
## 重启容器
docker restart rabbitmq
创建插件类型的交换机 config
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class CreatePlugInDelayedMQConfig {
    
    public static final String DEFAULT_EXCHANGE_NAME = "delayed.exchange";
    public static final String DEFAULT_QUEUE_NAME = "delayed.queue";
    public static final String DEFAULT_ROUTING_KEY = "delayed.routingKey";

    
    @Bean
    public CustomExchange delayedExchange() {
        Map arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        
        return new CustomExchange(DEFAULT_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    }

    
    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(DEFAULT_QUEUE_NAME).build();
    }

    
    @Bean
    public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DEFAULT_ROUTING_KEY).noargs();
    }

}

controller
import com.llc.rabbitmq.common.api.R;
import com.llc.rabbitmq.config.CreatePlugInDelayedMQConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;


@Api(tags = "RabbitMQ消息发送API")
@Slf4j
@RestController
@RequestMapping("/v1/rabbitmq")
public class SendMsgController {

    private final
    RabbitTemplate rabbitTemplate;

    public SendMsgController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    
    @ApiOperation(value = "向RabbitMQ中发送消息,基于插件的延迟对列")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path"),
            @ApiImplicitParam(name = "delay_time", value = "延迟时间", required = true, dataType = "Integer", dataTypeClass = Integer.class, paramType = "path")
    })
    @GetMapping("/send/plug_in/{message}/{delay_time}")
    public R sendMessagePlugInAssignTTL(@PathVariable String message, @PathVariable Integer delay_time) {
        log.info("当前时间:{}  ---  发送一条消息给QC队列 | {} | 过期时间 - {}MS", new Date(), message, delay_time);
        rabbitTemplate.convertAndSend(CreatePlugInDelayedMQConfig.DEFAULT_EXCHANGE_NAME, CreatePlugInDelayedMQConfig.DEFAULT_ROUTING_KEY, message, msg -> {
            // 设置延迟时间
            msg.getMessageProperties().setDelay(delay_time);
            return msg;
        });
        return R.ok("消息发送成功");
    }

}
sonsumer
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;


@Slf4j
@Component
public class DelayQueueConsumer {
    @RabbitListener(queues = {"delayed.queue"})
    public void delayQueueConsumer(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{} --- 收到延迟队列的消息 | {}", new Date(), msg);
    }
}

配置回调
  • 消息准确发送到交换机的确认回调

  • 消息未抵达队列的确认回调

callback
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;



@Slf4j
@Component
public class MessageSendExchangeCallback {

    private final RabbitTemplate rabbitTemplate;

    public MessageSendExchangeCallback(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    
    //@PostConstruct--RabbitMqConfig方法创建完成后,执行这个方法
    @PostConstruct
    public void initRabbitTemplate() {
        //设置确认回调
        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                log.warn("[[[交换机消息抵达确认]]]  /confirm/i...[{}]===>ack[{}]==>cause[{}]]", correlationData, ack, cause);
            }
        });

        //设置消息未抵达队列的确认回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println();
                log.warn("[[[消息未抵达队列]]]  message...[{}]===>replyCode[{}]==>replyText[{}]==>exchange[{}]==>routingKey[{}]", message, replyCode, replyText, exchange, routingKey);
            }
        });


    }
}
备份交换机

消息推送失败后将消息投递给备份交换机,通过备份交换机来进行对应处理

⚠️mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先 级高,经过上面结果显示答案是备份交换机优先级高。

Config
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class /confirm/iConfig {
    
    public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
    public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
    public static final String /confirm/i_ROUTING_KEY = "/confirm/i.routingKey";

    
    @Bean
    public DirectExchange /confirm/iExchange() {
        return ExchangeBuilder.directExchange(/confirm/i_EXCHANGE_NAME)
                .durable(true)
                //设置该交换机的备份交换机
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
                .build();
    }


    
    @Bean
    public Queue /confirm/iQueue() {
        return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
    }

    
    @Bean
    public Binding queueBindingExchange(Queue /confirm/iQueue, DirectExchange /confirm/iExchange) {
        return BindingBuilder.bind(/confirm/iQueue).to(/confirm/iExchange).with(/confirm/i_ROUTING_KEY);
    }

    
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    // 备份队列
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    // 警告队列
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    
    @Bean
    public FanoutExchange backupExchange() {
        return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE_NAME).build();
    }


    
    @Bean
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    
    @Bean
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }


    
    @Bean
    public Binding backupQueueBindingBackupExchange(Queue backupQueue, FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }


    
    @Bean
    public Binding warningQueueBindingBackupExchange(Queue warningQueue, FanoutExchange backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

controller
import com.llc.rabbitmq.common.api.R;
import com.llc.rabbitmq.config./confirm/iConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;


@Api(tags = "RabbitMQ消息发送并确认API")
@Slf4j
@RestController
@RequestMapping("/v1/rabbitmq//confirm/i")
public class ProducerController {

    private final RabbitTemplate rabbitTemplate;

    public ProducerController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }


    @ApiOperation(value = "向RabbitMQ中发送消息,并确认消息发送")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path")
    })
    @GetMapping("/send/{message}")
    public R sendMessage/confirm/i(@PathVariable String message) {
        // 构造一个消息确认回调的相关数据
        String msgId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(msgId);
        correlationData.setReturnedMessage(
                MessageBuilder.withBody(message.getBytes())
                        .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                        .setCorrelationId(msgId)
                        .build()
        );
        rabbitTemplate.convertAndSend(/confirm/iConfig./confirm/i_EXCHANGE_NAME, /confirm/iConfig./confirm/i_ROUTING_KEY+"1", message, correlationData);
        log.info("发送消息的内容 | {}", message);
        return R.ok("消息发送成功");
    }
}
consumer
import com.llc.rabbitmq.config./confirm/iConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class /confirm/iExchangeConsumer {

    
    @RabbitListener(queues = {/confirm/iConfig./confirm/i_QUEUE_NAME})
    public void receive/confirm/iQueue(Message message, String body, Channel channel) {
        log.info("确认队列消费者 message==>[{}],body==>[{}]", message, body);
    }

    
    @RabbitListener(queues = {/confirm/iConfig.BACKUP_QUEUE_NAME})
    public void receiveBackupQueue(Message message, String body, Channel channel) {
        log.info("备份交换机 《备份》队列消费者 message==>[{}],body==>[{}]", message, body);
    }

    
    @RabbitListener(queues = {/confirm/iConfig.WARNING_QUEUE_NAME})
    public void receiveWarningQueue(Message message, String body, Channel channel) {
        log.info("备份交换机 《警告》队列消费者 message==>[{}],body==>[{}]", message, body);
    }
}
优先级队列

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如 果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall 商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创 造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存 放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,

所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。

⚠️要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序

config
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class PriorityMQConfig {

    public static final String PRIORITY_EXCHANGE_NAME = "priority.exchange";
    public static final String PRIORITY_QUEUE_NAME = "priority.queue";
    public static final String PRIORITY_ROUTING_KEY = "priority.routingKey";


    
    @Bean
    public DirectExchange priorityExchanger() {
        return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE_NAME).durable(true).build();
    }

    
    @Bean
    public Queue priorityQueue() {
        Map arguments = new HashMap<>();
        // 官方运行的范围优先级为0-255,此处设置10
        arguments.put("x-max-priority", 10);

        return QueueBuilder.durable(PRIORITY_QUEUE_NAME).withArguments(arguments).build();
    }

    
    @Bean
    public Binding priorityQueueBindingExchange(Queue priorityQueue, DirectExchange priorityExchanger) {
        return BindingBuilder.bind(priorityQueue).to(priorityExchanger).with(PRIORITY_ROUTING_KEY);
    }

}

controller
import com.llc.rabbitmq.common.api.R;
import com.llc.rabbitmq.config.PriorityMQConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@Api(tags = "RabbitMQ优先级消息API")
@Slf4j
@RestController
@RequestMapping("/v1/rabbitmq/priority")
public class PriorityController {

    private final
    RabbitTemplate rabbitTemplate;

    public PriorityController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @ApiOperation(value = "向RabbitMQ中发送消息10条消息")
    @ApiImplicitParam(name = "message", value = "消息内容", required = true, dataType = "String", dataTypeClass = String.class, paramType = "path")
    @GetMapping("/send/{message}")
    public R sendMessage(@PathVariable String message) {
        for (int i = 0; i < 10; i++) {
            // 随机数生产
            int level = (int) (Math.random() * 10);
            log.info("消息==>[{}-{}],优先级==>[{}]", message, i, level);
            rabbitTemplate.convertAndSend(
                    PriorityMQConfig.PRIORITY_EXCHANGE_NAME,
                    PriorityMQConfig.PRIORITY_ROUTING_KEY,
                    "优先级消息==>[" + message + "]" + i + "优先级===>[" + level + "]",
                    mes -> {
                        mes.getMessageProperties().setPriority(level);
                        return mes;
                    });
        }
        return R.ok("消息发送成功");
    }

}
consumer
import com.llc.rabbitmq.config.PriorityMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class PriorityQueueConsumer {

    @RabbitListener(queues = {PriorityMQConfig.PRIORITY_QUEUE_NAME})
    public void receivePriorityQueue(Message message, String body, Channel channel) {
        log.info("《优先级》队列消费者 message==>[{}],body==>[{}]", message, body);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/459110.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号