常见的打车软件都会有匹配司机,这个可以用延迟队列来实现;处理已提交订单超过30分钟未付款失效的订单,延迟队列可以很好的解决;又或者注册了超过30天的用户,发短信撩动等。
二、技术选型(如何实现延时队列)1.通过实现Delayed接口
2.redis,用数据结构为zset的来实现。给每个键都添加了一个score的元素,就是分数。我们可以拿时间戳当作score给这个key,然后通过zrevrange获取key的时候指定score范围即可。最简单的延迟队列,而且,吞吐量也不算特别小
3.rabbitmq添加插件x-delayed-message来实现,极限时间为49天,选用了rabbitmq则会增加系统的复杂度
具体选型看具体业务场景和业务规模
三、不同选型的具体实现 1.通过实现Delay接口package com.hsxymini.app.common.queue.delayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class DelayTaskimplements Delayed { private final long time; private final T task; // 任务类,也就是之前定义的任务类 public DelayTask(long timeout, T task) { this.time = System.nanoTime() + timeout; this.task = task; } @Override public int compareTo(Delayed o) { // TODO Auto-generated method stub DelayTask other = (DelayTask) o; long diff = time - other.time; if (diff > 0) { return 1; } else if (diff < 0) { return -1; } else { return 0; } } @Override public long getDelay(TimeUnit unit) { // TODO Auto-generated method stub return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int hashCode() { return task.hashCode(); } public T getTask() { return task; } }
package com.hsxymini.app.common.queue.delayQueue;
import com.hsxymini.app.common.queue.DelayOrderTask;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class DelayQueueService {
private final static int DEFAULT_THREAD_NUM = 5;
private static int thread_num = DEFAULT_THREAD_NUM;
// 固定大小线程池
private ExecutorService executor;
// 守护线程
private Thread daemonThread;
// 延时队列
private DelayQueue> delayQueue;
private static final AtomicLong atomic = new AtomicLong(0);
private final long n = 1;
private static DelayQueueService instance = new DelayQueueService();
private DelayQueueService() {
executor = Executors.newFixedThreadPool(thread_num);
delayQueue = new DelayQueue<>();
init();
}
//实例化自己
public static DelayQueueService getInstance() {
return instance;
}
public void init() {
daemonThread = new Thread(() -> {
execute();
});
daemonThread.setName("DelayQueueMonitor");
daemonThread.start();
}
private void execute() {
while (true) {
//收集所有存活线程
Map map = Thread.getAllStackTraces();
System.out.println("当前存活线程数量:" + map.size());
int taskNum = delayQueue.size();
System.out.println("当前延时任务数量:" + taskNum);
try {
// 从延时队列中获取任务
DelayOrderTask> delayOrderTask = delayQueue.take();
if (delayOrderTask != null) {
Runnable task = delayOrderTask.getTask();
if (null == task) {
continue;
}
// 提交到线程池执行task
executor.execute(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void put(Runnable task, long time, TimeUnit unit) {
// 获取延时时间
long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
// 将任务封装成实现Delayed接口的消息体
DelayOrderTask> delayOrder = new DelayOrderTask<>(timeout, task);
// 将消息体放到延时队列中
delayQueue.put(delayOrder);
}
public boolean removeTask(DelayOrderTask task) {
return delayQueue.remove(task);
}
}
package com.hsxymini.app.common.queue.delayQueue;
public class DelayWorker implements Runnable{
@Override
public void run() {
}
}
package com.hsxymini.app.common.queue.delayQueue;
import com.hsxymini.app.common.annotation.AuthIgnore;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/delay")
public class DelayQueueController {
DelayQueueService manager = DelayQueueService.getInstance();
@GetMapping("/add")
@AuthIgnore
public void doAdd(){
DelayWorker work1 = new DelayWorker();
manager.put(work1, 3000, TimeUnit.MILLISECONDS);
}
}
小结:上面只是一个大致示例,需要根据具体业务场景具体设计。
2.通过rabbitmq延时消息队列1.安装插件rabbitmq_delayed_message_exchange
将插件放到安装rabbitmq的plugins目录
执行下面代码激活插件
> rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后重启rabbitmq即可
下面是配置config
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQDelayedMessageConfig {
public final static String DELAY_EXCHANGE = "jaymin.delay.exchange";
public final static String DELAY_QUEUE = "jaymin.delay.queue";
public final static String DELAY_ROUTING_KEY = "jaymin.delay.routingKey";
@Bean
public CustomExchange delayMessageExchange() {
Map args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", false, false, args);
}
@Bean
public Queue delayMessageQueue() {
return new Queue(DELAY_QUEUE, false, false, false);
}
@Bean
public Binding bindingDelayExchangeAndQueue() {
return BindingBuilder.bind(delayMessageQueue()).to(delayMessageExchange()).with(DELAY_ROUTING_KEY).noargs();
}
}
具体使用:
Integer ttl = 10000;
rabbitTemplate.convertAndSend(RabbitMQDelayedMessageConfig.DELAY_EXCHANGE,
RabbitMQDelayedMessageConfig.DELAY_ROUTING_KEY, letter, message -> {
// 设置过期时间
message.getMessageProperties().setDelay(ttl);
return message;
});
3.redis的zset实现暂先搁置,比较容易


