public class TestUtil {
public static void main(String[] args) {
//创建自定义长度的阻塞队列
ArrayBlockingQueue queue = new ArrayBlockingQueue<>(5);
//生产者1
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
System.out.println("producer offer " + i);
//添加数据
//如添加失败,移除头数据后,再添加数据
//丢弃过时数据
if (!queue.offer(i)) {
System.out.println("producer poll and offer " + i);
queue.poll();
queue.offer(i);
}
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//生产者2-与1类似
Thread producer1 = new Thread(() -> {
try {
for (int i = 20; i < 30; i++) {
System.out.println("producer offer " + i);
if (!queue.offer(i)) {
System.out.println("producer poll and offer " + i);
queue.poll();
queue.offer(i);
}
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumer = new Thread(() -> {
try {
while (true) {
Thread.sleep(30);
//消费数据
//使用阻塞方式也可以, 但会是线程无法停止
Integer num = queue.poll(1, TimeUnit.SECONDS);
if (num == null) {
break;
}
System.out.println("consumer poll " + num);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//阻塞main线程
producer.start();
producer1.start();
consumer.start();
try {
producer.join();
producer1.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}