目录
1、问题背景
2、延迟消息设计
2.1 RedisDelayQueue
2.2 RedisDelayQueueItem
2.3 RedisDelayQueueHandler
2.4 CompositeHandler
2.5 TestHandler
3、客户端代码
1、问题背景
在大的技术框架限制下,没有MQ成熟解决方案的条件下,而JDK ScheduledExecutorService、DelayQueue又有其局限性,我们就需要自己去实现一个简单的延迟消息,所以基于Redis客户端Redission进行了一个简单的封装,这个就是为什么要自己实现一个延迟消息的大背景。
2、延迟消息设计
我们使用一个基于Redis的分布式延迟队列来存储各种延迟消息,对延迟消息对象进行封装,赋予消息类型属性,然后,抽象出一个消息处理器接口出来,客户端只需要向延迟队列中投递消息,延迟队列组件,按照消息类型进行路由转发,由具体的处理器进行业务逻辑处理,这是一个典型的生产者-消费者模型。
2.1 RedisDelayQueue
提供给客户端使用的延迟消息队列,生产者可以向该队列,投递延迟消息。
package cn.vetech.center.cps.flsc.service.common.delayqueue;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.plugin.core.PluginRegistry;
import org.springframework.plugin.core.config.EnablePluginRegistries;
import org.springframework.stereotype.Component;
import org.vetech.core.modules.utils.collection.ListUtil;
import com.google.common.base.Preconditions;
import cn.vetech.center.cps.flsc.common.ex.FlscRuntimeException;
import cn.vetech.center.cps.flsc.service.common.threadpool.ExecutorServiceUtil;
import cn.vetech.center.cps.flsc.service.common.threadpool.RunnableWrapper;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@EnablePluginRegistries(RedisDelayQueueHandler.class)
@Component
public class RedisDelayQueue implements CommandLineRunner {
private static final String DELAY_QUEUE_NAME = "flsc_redis_delay_queue";
private static final ExecutorService SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor();
private static final ExecutorService EXECUTOR_SERVICE =
ExecutorServiceUtil.newExecutorService("RedisDelayQueueComponent", 5, 1000);
@Value("${flscRedisDelayQueue.prefix:flsc}")
private String flscRedisDelayQueuePrefix;
@Autowired
private RedissonClient redissonClient;
@Qualifier(RedisDelayQueueHandler.NAME)
@Autowired
private PluginRegistry redisDelayQueueHandlerRegistry;
public void put(Serializable object, long delay, TimeUnit timeUnit, RedisDelayMsgTypeEnum redisDelayMsgTypeEnum) {
Preconditions.checkNotNull(object, "object不能为Null");
Preconditions.checkNotNull(timeUnit, "timeUnit不能为Null");
Preconditions.checkNotNull(redisDelayMsgTypeEnum, "redisDelayMsgTypeEnum不能为Null");
try {
RBlockingDeque
2.2 RedisDelayQueueItem
对延迟消息的封装,延迟队列元素。
package cn.vetech.center.cps.flsc.service.common.delayqueue;
import java.io.Serializable;
import org.vetech.core.modules.utils.mapper.JsonMapper;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
public class RedisDelayQueueItem implements Serializable {
private static final long serialVersionUID = 1L;
private Object object;
private RedisDelayMsgTypeEnum redisDelayMsgTypeEnum;
public RedisDelayQueueItem(Object object, RedisDelayMsgTypeEnum redisDelayMsgTypeEnum) {
this.object = object;
this.redisDelayMsgTypeEnum = redisDelayMsgTypeEnum;
}
@SuppressWarnings("unchecked")
public T object() {
return (T)object;
}
@Override
public String toString() {
return JsonMapper.nonEmptyMapper().toJson(this);
}
}
2.3 RedisDelayQueueHandler
延迟消息处理器接口。
package cn.vetech.center.cps.flsc.service.common.delayqueue; import org.springframework.plugin.core.Plugin; public interface RedisDelayQueueHandler extends Plugin{ public static final String NAME = "redisDelayQueueHandlerRegistry"; void execute(RedisDelayQueueItem redisDelayQueueItem); }
2.4 CompositeHandler
复合型消息处理器抽象。
package cn.vetech.center.cps.flsc.service.common.delayqueue.handler;
import java.util.ArrayList;
import java.util.List;
import org.vetech.core.modules.utils.collection.ListUtil;
import cn.vetech.center.cps.flsc.common.constant.MagicNumberConst;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayMsgTypeEnum;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayQueueHandler;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayQueueItem;
import lombok.Data;
@Data
public abstract class CompositeHandler implements RedisDelayQueueHandler {
private List redisDelayQueueHandlerList;
protected void add(RedisDelayQueueHandler redisDelayQueueHandler) {
if (ListUtil.isEmpty(redisDelayQueueHandlerList)) {
this.redisDelayQueueHandlerList = new ArrayList<>(MagicNumberConst.INT10);
}
this.redisDelayQueueHandlerList.add(redisDelayQueueHandler);
}
@Override
public final void execute(RedisDelayQueueItem redisDelayQueueItem) {
if (ListUtil.isEmpty(this.redisDelayQueueHandlerList)) {
return;
}
for (RedisDelayQueueHandler redisDelayQueueHandler : this.redisDelayQueueHandlerList) {
if (redisDelayQueueHandler.supports(redisDelayQueueItem.getRedisDelayMsgTypeEnum())) {
redisDelayQueueHandler.execute(redisDelayQueueItem);
}
}
}
@Override
public final boolean supports(RedisDelayMsgTypeEnum redisDelayMsgTypeEnum) {
if (ListUtil.isEmpty(this.redisDelayQueueHandlerList)) {
return false;
}
for (RedisDelayQueueHandler redisDelayQueueHandler : this.redisDelayQueueHandlerList) {
if (redisDelayQueueHandler.supports(redisDelayMsgTypeEnum)) {
return true;
}
}
return false;
}
}
2.5 TestHandler
package cn.vetech.center.cps.flsc.service.common.delayqueue.handler;
import org.springframework.stereotype.Component;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayMsgTypeEnum;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayQueueHandler;
import cn.vetech.center.cps.flsc.service.common.delayqueue.RedisDelayQueueItem;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class TestHandler implements RedisDelayQueueHandler {
@Override
public void execute(RedisDelayQueueItem redisDelayQueueItem) {
log.info("消费延迟消息--开始");
String object = redisDelayQueueItem.object();
log.info(object);
log.info("消费延迟消息--结束");
}
@Override
public boolean supports(RedisDelayMsgTypeEnum redisDelayQueueEnum) {
return RedisDelayMsgTypeEnum.TEST == redisDelayQueueEnum;
}
}
3、客户端代码
package cn.vetech.center.cps.flsc.service.common.delayqueue;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import cn.vetech.center.cps.flsc.FlscCpsApplication;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FlscCpsApplication.class)
public class RedisDelayQueueTest {
@Autowired
private RedisDelayQueue redisDelayQueue;
@Test
public void putTest() throws InterruptedException {
log.info("投递延迟消息");
this.redisDelayQueue.put("liqiao", 30, TimeUnit.SECONDS, RedisDelayMsgTypeEnum.TEST);
TimeUnit.MINUTES.sleep(10);
}
@Test
public void put2Test() throws InterruptedException {
log.info("投递延迟消息");
this.redisDelayQueue.put("liqiao", 10, TimeUnit.SECONDS, RedisDelayMsgTypeEnum.TEST2);
TimeUnit.MINUTES.sleep(10);
}
}
package cn.vetech.center.cps.flsc.service.common.delayqueue;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import cn.vetech.center.cps.flsc.FlscCpsApplication;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FlscCpsApplication.class)
public class RedisDelayQueueTest {
@Autowired
private RedisDelayQueue redisDelayQueue;
@Test
public void putTest() throws InterruptedException {
log.info("投递延迟消息");
this.redisDelayQueue.put("liqiao", 30, TimeUnit.SECONDS, RedisDelayMsgTypeEnum.TEST);
TimeUnit.MINUTES.sleep(10);
}
@Test
public void put2Test() throws InterruptedException {
log.info("投递延迟消息");
this.redisDelayQueue.put("liqiao", 10, TimeUnit.SECONDS, RedisDelayMsgTypeEnum.TEST2);
TimeUnit.MINUTES.sleep(10);
}
}



