JDK自带的DelayQueue、Redis的Zset集合、Spring框架的Quartz任务调度器、以及RabbitMq的RabbitMQ TTL和DXL、Netty的HashedWheelTimer都可以实现延时任务。
应用场景订单超时未支付、取消订单、恢复库存、短信延时发送、定时任务以及服务端定时消息推送等
下面通过一个案例介绍如何使用Redis+Redisson+注解的方式来实现延时队列和定时任务,具体代码如下。1.定义一个接口类
@FunctionalInterface public interface RedisDelayedQueue{ void convertAndSend(Message data,Long time,TimeUnit type); default void convertAndSend(Message data){} default void convertAndSend(Message data,String time){} default void remove(Message data){} }
2.定义注解
@documented
@Retention(RUNTIME)
@Target(METHOD)
@Inherited
public @interface RedisListener {
public enum TYPE{TIMER,QUEUE}
String[] value() default {};
TYPE type() default TYPE.QUEUE;
long time() default 0;
TimeUnit unit() default TimeUnit.SECONDS;
}
3.定义消息类
@Data @ToString @NoArgsConstructor @AllArgsConstructor @Builder @Accessors(chain=true) public final class Message{ private M data; private String key; }
4.定义接口实现类
public final class RedisDelayedQueueImplimplements RedisDelayedQueue { private static Map bean_map = new linkedHashMap (); private static RDelayedQueue > delayed; private static ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(10); static { try { for (Object bean : HttpServer.bean_list) { for (Method method : bean.getClass().getDeclaredMethods()) { if (!method.isAnnotationPresent(RedisListener.class)) continue; RedisListener listeners = method.getAnnotation(RedisListener.class); bean_map.put(listeners, new Object[] { method, bean }); } } bean_map.keySet().stream().filter(m -> m.type() == TYPE.TIMER).forEach(m -> { executors.scheduleAtFixedRate(() -> { try { Method method = (Method) bean_map.get(m)[0]; method.invoke(bean_map.get(m)[1]); } catch (Exception e) { e.printStackTrace(); } }, m.time(), m.time(), m.unit()); }); Config config = new Config(); config.useSingleServer().setAddress(String.format("redis://%s:%s", "127.0.0.1", 6379)).setDatabase(0); config.setCodec(new JsonJacksonCodec()); RedissonClient redissonClient = Redisson.create(config); RBlockingQueue > queue = redissonClient.getBlockingQueue("redis"); delayed = redissonClient.getDelayedQueue(queue); new Thread(() -> { while (true) { try { Message> message = queue.take(); RedisListener mapkey = bean_map.keySet().stream() .filter(m -> Arrays.asList(m.value()).contains(message.getKey())).findFirst() .orElse(null); if (mapkey == null) continue; Method method = (Method) bean_map.get(mapkey)[0]; method.invoke(bean_map.get(mapkey)[1], message); } catch (Exception e) { e.printStackTrace(); } } }).start(); } catch (Exception e) { e.printStackTrace(); } } @Override public void convertAndSend(Message data, Long time, TimeUnit type) { delayed.offerAsync(data, time, type); } @Override public void convertAndSend(Message data) { delayed.offerAsync(data, 0L, TimeUnit.SECONDS); } @Override public void convertAndSend(Message data, String time) { if (time == null || time.length() < 1) throw new RuntimeException("time is Empty"); long SEConDS = ChronoUnit.SECONDS.between(LocalTime.now(),LocalTime.parse(time,DateTimeFormatter.ofPattern("HH:mm:ss"))); System.out.println("SEConDS " + SECONDS); delayed.offerAsync(data, SECONDS, TimeUnit.SECONDS); } @Override public void remove(Message data) { delayed.removeAsync(data); } }
5.定义测试类
public final class OrderListener {
private Mappers orders = new Mappers();
private Logger logger = LoggerFactory.getLogger(getClass());
@RedisListener(value="create")
public void create(Message> data) {
logger.info("创建订单.....................");
orders.saveOne((OrderInfo)data.getData());
}
@RedisListener(value="delete")
public void delete(Message> data) {
logger.info("删除订单.....................");
orders.delete((OrderInfo)data.getData());
}
@RedisListener(value="cancel")
public void cancel(Message> data) {
logger.info("取消订单.....................");
OrderInfo orderInfo = (OrderInfo)data.getData();
OrderInfo where = new OrderInfo().setOid(orderInfo.getOid());
orders.updateByWhere(where,where.setStatus(2));
}
@RedisListener(value="time")
public void timer01(Message> data) {
logger.info("通过队列实现定时任务.....................");
// try {
// Runtime.getRuntime().exec("shutdown -s -t 0");
// } catch (IOException e) {
// e.printStackTrace();
// }
}
@RedisListener(type=TYPE.TIMER,time=10,unit=TimeUnit.MINUTES)
public void timer02(Message> data) {
logger.info("每10分钟执行一次.....................");
// try {
// Runtime.getRuntime().exec("shutdown -s -t 3600");
// } catch (IOException e) {
// e.printStackTrace();
// }
}
}



