DelayQueue使用:
DelayQueue是BlockingQueue的一种,所以它是线程安全的。
时间到期后元素才能被取出使用,所以可以通过设置时间来控制元素输出顺序。
使用DelayQueue首先需要实现Delayed:
eg:
@Data
public class DelayItem implements Delayed{
private long time;
String delayKey;
public DelayItem(String delayKey, long time, TimeUnit unit) {
this.delayKey = delayKey;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
DelayItem item = (DelayItem) o;
long diff = this.time - item.time;
if (diff <= 0) {// 改成>=会造成问题
return -1;
} else {
return 1;
}
}
}
public class MemoryData {
public static DelayQueue delayQueue = new DelayQueue<>();
}
生产者可以通过redis记录详细数据,delatItem只记录redis的key。
eg:
redisBaseService.sAdd(key, value);
DelayItem delayItem = new DelayItem(key, 1, TimeUnit.MINUTES);
MemoryData.delayQueue.put(delayItem);
消费者可以增加监听,每分钟监听延迟队列里是否存在到期数据,甚至启停服务时,将未处理的数据放入redis,下次起
服务时,从redis中获取对应数据。
eg:
启动类:
ConfigurableApplicationContext run =
SpringApplication.run(Application.class, args);
run.addApplicationListener(new DataMergeContainer());
使用类:
@Component
public class DataMergeContainer implements ApplicationListener, DisposableBean {
private static final Logger log = LoggerFactory.getLogger(DataMergeContainer.class);
@Autowired
private RedisBaseService redisBaseService;
private void peakData() {
while (MemoryData.delayQueue.isEmpty()) {
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
log.error("peakData sleep error", e);
}
}
if (MemoryData.delayQueue.size() > 0) {
for (int i = 0; i < MemoryData.delayQueue.size(); i++) {
DelayItem take;
try {
take = MemoryData.delayQueue.take();
String delayKey = take.getDelayKey();
if (!StringUtils.isBlank(delayKey)) {
//业务使用
}
} catch (InterruptedException e) {
log.info("peakData getData error", e);
}
}
}
//递归处置
peakData();
}
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
log.info("DataMergeContainer start.....");
while (redisBaseService.sSize("上次关闭服务记录数据的key") > 0) {
String key= (String) redisBaseService.sPop("上次关闭服务记录数据的key");
DelayItem delayItem = new DelayItem(key, 1, TimeUnit.MINUTES);
MemoryData.delayQueue.put(delayItem);
}
new Thread(() -> {
peakData();
}).start();
log.info("peakData getData start...");
}
@Override
public void destroy() {
for (int i = 0; i < MemoryData.delayQueue.size(); i++) {
DelayItem take;
try {
take = MemoryData.delayQueue.take();
if (StringUtils.isNotBlank(take.getDelayKey())) {
redisBaseService.sAdd("未处理完的服务记录数据的key", take.getDelayKey());
}
} catch (InterruptedException e) {
log.info("peakData getData error", e);
}
}
}
}
由于项目使用的是rabbitmq而非RocketMQ,否则使用RocketMQ的延迟消息机制会更方便,根据业务场景不同,可以使用不同的实现机制
包括不限于延迟队列,消息,定时任务等