import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class ThreadTest {
private static final Semaphore SEMAPHORE = new Semaphore(0);
private volatile static String result = "1";
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
@Test
public void test() throws Exception {
//模拟测试,在实际场景中建议使用springboot @Async开启异步任务
CompletableFuture.runAsync(() -> {
try {
//获取许可,若没有许可,一直等待
SEMAPHORE.acquire();
//将四个定时任务放入队列
//前三次定时等待时间为1秒钟
long firstTime = 1000;
//第四个等待定时等待时间为5秒钟
long secondTime = 5 * firstTime;
//将四个定时任务放入延时队列
List delayList = Arrays.asList(new MyDelay(firstTime), new MyDelay(firstTime), new MyDelay(firstTime), new MyDelay(secondTime));
DelayQueue delayQueue = new DelayQueue<>(delayList);
//队列不为空不退出循环,此场景永远不退出
while (!delayQueue.isEmpty()) {
if (!"1".equals(result)) {
//发送成功,退出循环,等待信号量发送许可(连接异常时重新进入循环)
//清除队列,等待下次重新入列
delayQueue.clear();
System.out.println("发送报文成功");
COUNT_DOWN_LATCH.countDown();
break;
}
//模拟发送失败,等待延时队列任务,继续发送报文
MyDelay take = delayQueue.take();
System.out.println(take.getDelay(TimeUnit.MILLISECONDS));
//若一轮(4个)定时任务结束,重新放入四个
if (delayQueue.isEmpty()) {
//重新放入队列
delayList = Arrays.asList(new MyDelay(firstTime), new MyDelay(firstTime), new MyDelay(firstTime), new MyDelay(secondTime));
delayQueue = new DelayQueue<>(delayList);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
//模拟连接状态异常
Thread.sleep(10 * 1000);
//发送许可(模拟连接状态异常)
SEMAPHORE.release(1);
//模拟发送报文成功
Thread.sleep(20 * 1000);
result = "2";
//主线程等待,在实际业务场景中不需要
COUNT_DOWN_LATCH.await();
}
}
class MyDelay implements Delayed {
long delayTime;
public MyDelay(long delayTime) {
this.delayTime = (System.currentTimeMillis() + delayTime);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}