1,前言
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员。
首先先搞清楚他与Collection是一个什么关系,然后我们看下jdk1.8的官方文档,
2,认识BlockingQueue
阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:
从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)
- 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
- 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。
3,BlockingQueue的核心方法
public class BlockQueueDemo {
public static void main(String[] args) throws InterruptedException {
//阻塞队列
// ------------------------------------------------------------------------------
//抛出异常 有返回值
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
//IllegalStateException Queue full 线程已经慢
//System.out.println(blockingQueue.add("c"));
System.out.println("队首-->"+blockingQueue.element());//查看队首
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//NoSuchElementException 没有元素
//System.out.println(blockingQueue.remove());
// ------------------------------------------------------------------------------
//没有异常 有返回值
ArrayBlockingQueue blockingQueue1 = new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue1.offer("a1"));
System.out.println(blockingQueue1.offer("b1"));
//返回false 不抛出异常
//System.out.println(blockingQueue1.offer("c"));
System.out.println("队首-->"+blockingQueue1.peek());//查看队首
System.out.println(blockingQueue1.poll());
System.out.println(blockingQueue1.poll());
//没有异常 null
//System.out.println(blockingQueue1.poll());
// ------------------------------------------------------------------------------
//等待 阻塞(等待超时)程序会关闭 没有返回值
ArrayBlockingQueue blockingQueue3 = new ArrayBlockingQueue<>(2);
blockingQueue3.offer("a3");
blockingQueue3.offer("b3");
//没有位置了 一直阻塞
//blockingQueue3.offer("c3",3,TimeUnit.SECONDS);
System.out.println(blockingQueue3.poll());
System.out.println(blockingQueue3.poll());
//没有这个元素一直阻塞
//blockingQueue3.poll(3,TimeUnit.SECONDS);
// ------------------------------------------------------------------------------
//等待 阻塞(一直阻塞)程序不会关闭 没有返回值
ArrayBlockingQueue blockingQueue2 = new ArrayBlockingQueue<>(2);
blockingQueue2.put("a2");
blockingQueue2.put("b2");
//没有位置了 一直阻塞
//blockingQueue2.put("c");
System.out.println(blockingQueue2.take());
System.out.println(blockingQueue2.take());
//没有这个元素一直阻塞
//System.out.println(blockingQueue2.take());
}
}
4,常见BlockingQueue
- ArrayBlockingQueue:底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。
-
linkedBlockingQueue:是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部,容量限制是可选的,如果在初始化时没有指定容量,那么默认使用Integer.MAX_VALUE的最大值作为队列容量。维持有两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(可以+1),另一个是出队的方法(可以-1),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步
-
作为开发者,我们需要注意的是,如果构造一个linkedBlockingQueue对象,而没有指定其容量大小,linkedBlockingQueue会默认一个类似无限大小的容量,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。ArrayBlockingQueue和linkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。下面的代码演示了如何使用BlockingQueue:
public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // 声明一个容量为10的缓存队列 BlockingQueuequeue = new linkedBlockingQueue (10); //new了三个生产者和一个消费者 Producer producer1 = new Producer(queue); // Producer producer2 = new Producer(queue); // Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); // service.execute(producer2); // service.execute(producer3); service.execute(consumer); // 执行10s Thread.sleep(10 * 1000); producer1.stop(); // producer2.stop(); // producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); } } public class Producer implements Runnable { private volatile boolean isRunning = true;//是否在运行标志 private BlockingQueue queue;//阻塞队列 private static AtomicInteger count = new AtomicInteger();//自动更新的值 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; //构造函数 public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { String data = null; Random r = new Random(); System.out.println("启动生产者线程!"); try { while (isRunning) { System.out.println(Thread.currentThread().getName()+"正在生产数据..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数 data = "data:" + count.incrementAndGet();//以原子方式将count当前值加1 System.out.println(Thread.currentThread().getName()+"将数据:" + data + "放入队列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true System.out.println(Thread.currentThread().getName()+"放入数据失败:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生产者线程!"); } } public void stop() { isRunning = false; } }public class Consumer implements Runnable { private BlockingQueuequeue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; //构造函数 public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { System.out.println("启动消费者线程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println(Thread.currentThread().getName()+"正从队列获取数据..."); String data = queue.poll(2, TimeUnit.SECONDS);//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败 if (null != data) { System.out.println(Thread.currentThread().getName()+"拿到数据:" + data); System.out.println(Thread.currentThread().getName()+"正在消费数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消费者线程!"); } } } -
DelayQueue:DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
-
PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
-
SynchronousQueue:同步队列 没有容量,put一个元素必须等待取take出来才能继续put 不存储元素
public class BlockQueueDemo { public static void main(String[] args) throws InterruptedException { //同步队列 没有容量,put一个元素必须等待取take出来才能继续put 不存储元素 BlockingQueueblockingQueue4 = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"put a4"); blockingQueue4.put("a4"); System.out.println(Thread.currentThread().getName()+"put b4"); blockingQueue4.put("b4"); System.out.println(Thread.currentThread().getName()+"put c4"); blockingQueue4.put("c4"); } catch (InterruptedException e) { e.printStackTrace(); } },"T1").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"take"+blockingQueue4.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"take"+blockingQueue4.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"take"+blockingQueue4.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"T2").start(); } }
5,总结
这里主要总结一下ArrayBlockingQueue与linkedBlockingQueue的区别
ArrayBlockingQueue (1)一个对象数组+一把锁+两个条件(2)入队与出队都用同一把锁(3)在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高(4)采用了数组,必须指定大小,即容量有限(5)在插入或删除元素时不会产生或销毁任何额外的对象实例
linkedBlockingQueue(1)一个单向链表+两把锁+两个条件(2)两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。(3)在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多(4)采用了链表,最大容量为整数最大值,可看做容量无限(5)在插入或删除元素时会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别



