栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

使用RabbitMQ实现订单的延迟消费以及延迟插件的使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用RabbitMQ实现订单的延迟消费以及延迟插件的使用

首先Docker搭建一个RabbitMQ服务 使用docker-compose搭建rabbitMQ
  • 编写docker-compose文件
version: '3'
services:

  # rabbitMq服务
  breeze-rabbitmq:
    image: rabbitmq:3.9.13-management
    container_name: breeze-rabbitmq
    networks:
      - breeze-net
    hostname: breeze-rabbitmq
    restart: always
    ports:
      - "4369:4369"
      - "15672:15672"
      - "5672:5672"
      - "25672:25672"
    volumes:
      - "./docker/rabbitmq/data:/var/lib/rabbitmq"
      - "./docker/rabbitmq/log:/var/log/rabbitmq/log"

networks:
  breeze-net:
    external: false
  • 启动docker compose,自动拉镜像启动
    docker compose up breeze-rabbitmq -d
  • 启动成功:

  • 浏览器访问 http://localhost:15672/,如图所示

  • 使用guest/guest 登录进来,在 Admin 菜单创建需要使用的账户 admin/admin 权限 administrator
  • 给admin用户配置 权限

  • RabbitMQ单机版搭建完成(开发使用)
整合SpringBoot使用DXL+TTL实现延迟队列
  • 配置死信队列所需的常量
package com.example.rabbitmq.demo.constants;


public interface DeadMQConstants {

    String ORDER_QUEUE_KEY = "order.routing.key";

    String ORDER_DEAD_QUEUE = "order.dead.queue";

    String ORDER_DEAD_DIRECT_EXCHANGE = "order.dead.direct.exchange";

    String ORDER_QUEUE = "order.queue";

    String ORDER_DIRECT_EXCHANGE = "order.direct.exchange";

}
  • 配置死信队列
package com.example.rabbitmq.demo.config;

import com.example.rabbitmq.demo.constants.DeadMQConstants;
import com.google.common.collect.Maps;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;


@Configuration
public class RabbitMQDeadConfig {


    
    @Bean
    public Queue orderDeadQueue() {
        return new Queue(DeadMQConstants.ORDER_DEAD_QUEUE);
    }

    
    @Bean
    public DirectExchange orderDeadDirectExchange() {
        return new DirectExchange(DeadMQConstants.ORDER_DEAD_DIRECT_EXCHANGE);
    }

    
    @Bean
    public Binding orderDeadBinding(@Qualifier("orderDeadQueue") Queue orderDeadQueue,
                                    @Qualifier("orderDeadDirectExchange") DirectExchange orderDeadDirectExchange) {
        return BindingBuilder.bind(orderDeadQueue).to(orderDeadDirectExchange).with(DeadMQConstants.ORDER_QUEUE_KEY);
    }

    
    @Bean
    public Queue orderQueue() {
        Map args = Maps.newHashMap();
        args.put("x-dead-letter-exchange", DeadMQConstants.ORDER_DEAD_DIRECT_EXCHANGE);
        args.put("x-dead-letter-routing-key", DeadMQConstants.ORDER_QUEUE_KEY);
        return new Queue(DeadMQConstants.ORDER_QUEUE, true, false, false, args);
    }

    
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(DeadMQConstants.ORDER_DIRECT_EXCHANGE);
    }

    
    @Bean
    public Binding bindingOrder(@Qualifier("orderQueue") Queue orderQueue,
                                @Qualifier("orderExchange") DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(DeadMQConstants.ORDER_QUEUE_KEY);
    }

}

  • 测试类
@Test
public void testSendDeadMsg() {
    String msg = "hello dead message";
    this.amqpTemplate.convertAndSend(
            DeadMQConstants.ORDER_DIRECT_EXCHANGE,
            DeadMQConstants.ORDER_QUEUE_KEY,
            msg,
            message -> {
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                message.getMessageProperties().setExpiration("3000");
                return message;
            });
}
配置插件实现延迟队列
  • 此时刚启动可见只创建direct、fanout、headers、topic 交换机

  • 下载插件
    https://www.rabbitmq.com/community-plugins.html

  • 把插件拷贝到容器中

    docker cp rabbitmq_delayed_message_exchange-3.10.0.ez breeze-rabbitmq:/plugins
    docker exec -it breeze-rabbitmq /bin/bash

  • 启动插件
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 启动插件成功

  • 查看web控制台 已经可以创建延迟队列

延迟队列插件实现延迟队列
  • 常量
package com.example.rabbitmq.demo.constants;


public interface DelayedMQConstants {

    String ORDER_DELAYED_ROUTING_KEY = "order.delayed.routing.key";

    String ORDER_DELAYED_DIRECT_EXCHANGE = "order.delayed.direct.exchange";

    String ORDER_DELAYED_QUEUE = "order.delayed.queue";
}
  • 配置
package com.example.rabbitmq.demo.config;

import com.example.rabbitmq.demo.constants.DelayedMQConstants;
import com.google.common.collect.Maps;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;


@Configuration
public class RabbitMQDelayedConfig {


    
    @Bean
    public Queue orderDelayedQueue() {
        return new Queue(DelayedMQConstants.ORDER_DELAYED_QUEUE);
    }

    
    @Bean
    public CustomExchange orderDelayDirectExchange() {
        Map args = Maps.newHashMap();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DelayedMQConstants.ORDER_DELAYED_DIRECT_EXCHANGE, "x-delayed-message", true, false, args);
    }

    
    @Bean
    public Binding bindingNotify(@Qualifier("orderDelayedQueue") Queue queue,
                                 @Qualifier("orderDelayDirectExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DelayedMQConstants.ORDER_DELAYED_ROUTING_KEY).noargs();
    }

}
  • 测试
@Test
public void testSendDelayedMsg() {
    String msg = "hello delay message";
    amqpTemplate.convertAndSend(
            DelayedMQConstants.ORDER_DELAYED_DIRECT_EXCHANGE,
            DelayedMQConstants.ORDER_DELAYED_ROUTING_KEY,
            msg,
            message -> {
                message.getMessageProperties().setDelay(6000);
                return message;
            });
}
  • github-demo
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/871888.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号