栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

基于redis客户端redission实现延迟队列的功能的工程实践

基于redis客户端redission实现延迟队列的功能的工程实践

目录

1、问题背景

2、延迟消息设计

2.1 RedisDelayQueue

2.2 RedisDelayQueueItem

2.3 RedisDelayQueueHandler

2.4 CompositeHandler

2.5 TestHandler

3、客户端代码


1、问题背景

     在大的技术框架限制下,没有MQ成熟解决方案的条件下,而JDK ScheduledExecutorService、DelayQueue又有其局限性,我们就需要自己去实现一个简单的延迟消息,所以基于Redis客户端Redission进行了一个简单的封装,这个就是为什么要自己实现一个延迟消息的大背景。

2、延迟消息设计

      我们使用一个基于Redis的分布式延迟队列来存储各种延迟消息,对延迟消息对象进行封装,赋予消息类型属性,然后,抽象出一个消息处理器接口出来,客户端只需要向延迟队列中投递消息,延迟队列组件,按照消息类型进行路由转发,由具体的处理器进行业务逻辑处理,这是一个典型的生产者-消费者模型。

2.1 RedisDelayQueue

     提供给客户端使用的延迟消息队列,生产者可以向该队列,投递延迟消息。

package cn.vetech.center.cps.flsc.service.common.delayqueue;

import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.plugin.core.PluginRegistry;
import org.springframework.plugin.core.config.EnablePluginRegistries;
import org.springframework.stereotype.Component;
import org.vetech.core.modules.utils.collection.ListUtil;

import com.google.common.base.Preconditions;

import cn.vetech.center.cps.flsc.common.ex.FlscRuntimeException;
import cn.vetech.center.cps.flsc.service.common.threadpool.ExecutorServiceUtil;
import cn.vetech.center.cps.flsc.service.common.threadpool.RunnableWrapper;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@EnablePluginRegistries(RedisDelayQueueHandler.class)
@Component
public class RedisDelayQueue implements CommandLineRunner {

    private static final String DELAY_QUEUE_NAME = "flsc_redis_delay_queue";

    
    private static final ExecutorService SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor();

    
    private static final ExecutorService EXECUTOR_SERVICE =
        ExecutorServiceUtil.newExecutorService("RedisDelayQueueComponent", 5, 1000);
    
    @Value("${flscRedisDelayQueue.prefix:flsc}")
    private String flscRedisDelayQueuePrefix;
    
    @Autowired
    private RedissonClient redissonClient;

    @Qualifier(RedisDelayQueueHandler.NAME)
    @Autowired
    private PluginRegistry redisDelayQueueHandlerRegistry;
 
    public void put(Serializable object, long delay, TimeUnit timeUnit, RedisDelayMsgTypeEnum redisDelayMsgTypeEnum) {
        Preconditions.checkNotNull(object, "object不能为Null");
        Preconditions.checkNotNull(timeUnit, "timeUnit不能为Null");
        Preconditions.checkNotNull(redisDelayMsgTypeEnum, "redisDelayMsgTypeEnum不能为Null");
        try {
            RBlockingDeque blockingDeque = this.redissonClient.getBlockingDeque(this.delayQueueName());
            RDelayedQueue delayedQueue = this.redissonClient.getDelayedQueue(blockingDeque);
            RedisDelayQueueItem redisDelayQueueItem = new RedisDelayQueueItem(object, redisDelayMsgTypeEnum);
            delayedQueue.offer(redisDelayQueueItem, delay, timeUnit);
            log.info("添加延时队列成功,队列:{},值:{},延迟时间:{}(秒)", redisDelayMsgTypeEnum.getCode(), object,
                timeUnit.toSeconds(delay));
        } catch (Exception e) {
            log.error("添加延时队列失败", e);

        }
    }

 
    private RedisDelayQueueItem get() {
        try {
            RBlockingDeque blockingDeque = this.redissonClient.getBlockingDeque(this.delayQueueName());
            RedisDelayQueueItem redisDelayQueueItem = (RedisDelayQueueItem)blockingDeque.take();
            return redisDelayQueueItem;
        } catch (Exception e) {
            log.error("get()异常", e);
        }
        return null;
    }

    
    private List redisDelayQueueHandlers(RedisDelayMsgTypeEnum redisDelayMsgTypeEnum) {
        List redisDelayQueueHandlerList =
            this.redisDelayQueueHandlerRegistry.getPluginsFor(redisDelayMsgTypeEnum);
        if (ListUtil.isNotEmpty(redisDelayQueueHandlerList)) {
            return redisDelayQueueHandlerList;
        }
        throw new FlscRuntimeException("非法的redisDelayQueueEnum:" + Objects.toString(redisDelayMsgTypeEnum));
    }

 
    @Override
    public void run(String... args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    RedisDelayQueueItem redisDelayQueueItem = get();
                    if (null != redisDelayQueueItem) {
                        Runnable runnableOfHandler = new Runnable() {
                            @Override
                            public void run() {
                                List redisDelayQueueHandlerList =
                                    redisDelayQueueHandlers(redisDelayQueueItem.getRedisDelayMsgTypeEnum());
                              for (RedisDelayQueueHandler redisDelayQueueHandler : redisDelayQueueHandlerList) {
                                    redisDelayQueueHandler.execute(redisDelayQueueItem);
                                }
                            }
                        };
                        RunnableWrapper runnableWrapper = new RunnableWrapper(runnableOfHandler, "处理redis延迟队列中延迟消息");
                        EXECUTOR_SERVICE.execute(runnableWrapper);

                    }
                }

            }
        };
        SINGLE_THREAD_EXECUTOR.execute(runnable);

    }

 private String delayQueueName() {
        return this.flscRedisDelayQueuePrefix + ":" + DELAY_QUEUE_NAME;
    }
}

 

2.2 RedisDelayQueueItem

    对延迟消息的封装,延迟队列元素。

package cn.vetech.center.cps.flsc.service.common.delayqueue;

import java.io.Serializable;

import org.vetech.core.modules.utils.mapper.JsonMapper;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
public class RedisDelayQueueItem implements Serializable {
    private static final long serialVersionUID = 1L;
    
    private Object object;
    
    private RedisDelayMsgTypeEnum redisDelayMsgTypeEnum;

    public RedisDelayQueueItem(Object object, RedisDelayMsgTypeEnum redisDelayMsgTypeEnum) {
        this.object = object;
        this.redisDelayMsgTypeEnum = redisDelayMsgTypeEnum;
    }

    @SuppressWarnings("unchecked")
    public  T object() {
        return (T)object;
    }

    @Override
    public String toString() {
        return JsonMapper.nonEmptyMapper().toJson(this);
    }

}

2.3 RedisDelayQueueHandler

    延迟消息处理器接口。

package cn.vetech.center.cps.flsc.service.common.delayqueue;

import org.springframework.plugin.core.Plugin;


public interface RedisDelayQueueHandler extends Plugin {
    
    public static final String NAME = "redisDelayQueueHandlerRegistry";

    
    void execute(RedisDelayQueueItem redisDelayQueueItem);

}

2.4 CompositeHandler

复合型消息处理器抽象。

package cn.vetech.center.cps.flsc.service.common.delayqueue.handler;

import java.util.ArrayList;
import java.util.List;

import org.vetech.core.modules.utils.collection.ListUtil;

import cn.vetech.center.cps.flsc.common.constant.MagicNumberConst;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayMsgTypeEnum;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayQueueHandler;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayQueueItem;
import lombok.Data;



@Data
public abstract class CompositeHandler implements RedisDelayQueueHandler {
    private List redisDelayQueueHandlerList;

protected void add(RedisDelayQueueHandler redisDelayQueueHandler) {
        if (ListUtil.isEmpty(redisDelayQueueHandlerList)) {
            this.redisDelayQueueHandlerList = new ArrayList<>(MagicNumberConst.INT10);

        }
        this.redisDelayQueueHandlerList.add(redisDelayQueueHandler);
    }

@Override
    public final void execute(RedisDelayQueueItem redisDelayQueueItem) {
        if (ListUtil.isEmpty(this.redisDelayQueueHandlerList)) {
            return;
        }
        for (RedisDelayQueueHandler redisDelayQueueHandler : this.redisDelayQueueHandlerList) {
            if (redisDelayQueueHandler.supports(redisDelayQueueItem.getRedisDelayMsgTypeEnum())) {
                redisDelayQueueHandler.execute(redisDelayQueueItem);
            }

        }

    }

    @Override
    public final boolean supports(RedisDelayMsgTypeEnum redisDelayMsgTypeEnum) {
        if (ListUtil.isEmpty(this.redisDelayQueueHandlerList)) {
            return false;
        }
        for (RedisDelayQueueHandler redisDelayQueueHandler : this.redisDelayQueueHandlerList) {
            if (redisDelayQueueHandler.supports(redisDelayMsgTypeEnum)) {
                return true;
            }
        }
        return false;
    }

}

2.5 TestHandler
package cn.vetech.center.cps.flsc.service.common.delayqueue.handler;

import org.springframework.stereotype.Component;

import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayMsgTypeEnum;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayQueueHandler;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayQueueItem;
import lombok.extern.slf4j.Slf4j;


@Component
@Slf4j
public class TestHandler implements RedisDelayQueueHandler {

    @Override
    public void execute(RedisDelayQueueItem redisDelayQueueItem) {
        log.info("消费延迟消息--开始");
        String object = redisDelayQueueItem.object();
        log.info(object);
        log.info("消费延迟消息--结束");
    }

   @Override
    public boolean supports(RedisDelayMsgTypeEnum redisDelayQueueEnum) {
        return RedisDelayMsgTypeEnum.TEST == redisDelayQueueEnum;
    }
}

3、客户端代码
package cn.vetech.center.cps.flsc.service.common.delayqueue;

import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import cn.vetech.center.cps.flsc.FlscCpsApplication;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FlscCpsApplication.class)
public class RedisDelayQueueTest {
    @Autowired
    private RedisDelayQueue redisDelayQueue;

    
  @Test
    public void putTest() throws InterruptedException {
        log.info("投递延迟消息");
        this.redisDelayQueue.put("liqiao", 30, TimeUnit.SECONDS, RedisDelayMsgTypeEnum.TEST);
        TimeUnit.MINUTES.sleep(10);
    }



    @Test
    public void put2Test() throws InterruptedException {
        log.info("投递延迟消息");
        this.redisDelayQueue.put("liqiao", 10, TimeUnit.SECONDS, RedisDelayMsgTypeEnum.TEST2);
        TimeUnit.MINUTES.sleep(10);
    }
}

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号