栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

突击并发编程JUC系列-阻塞队列 BlockingQueue

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

突击并发编程JUC系列-阻塞队列 BlockingQueue

> 突击并发编程JUC系列演示代码地址:
> https://github.com/mtcarpenter/JavaTutorial

什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

插入和移除操作的4种处理方式
方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出 IllegalStateException (“Queue full”)异常。当队列空时,从队列里获取元素会抛出NoSuchElementException 异常。
  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回 null 。
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里 take 元素,队列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

> 如果是无界阻塞队列,队列不可能会出现满的情况,所以使用 put 或 offer 方法永远不会被阻塞,而且使用offer方法时,该方法永远返回 true。

ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。

阻塞式写方法

在ArrayBlockingQueue中提供了两个阻塞式写方法,分别如下(在该队列中,无论是阻塞式写方法还是非阻塞式写方法,都不允许写入null)。

  • void put(E e):向队列的尾部插入新的数据,当队列已满时调用该方法的线程会进入阻塞,直到有其他线程对该线程执行了中断操作,或者队列中的元素被其他线程消费。
  • boolean offer(E e, long timeout, TimeUnit unit):向队列尾部写入新的数据,当队列已满时执行该方法的线程在指定的时间单位内将进入阻塞,直到到了指定的超时时间后,或者在此期间有其他线程对队列数据进行了消费。

put() 方法示例

public class ArrayBlockingQueueExample1 {
    public static void main(String[] args) {
 ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
 try {
     queue.put("class 1");
     queue.put("class 2");
     queue.put("class 3");
     // 超过指定得容量当前线程阻塞
     queue.put("class 4");
 } catch (InterruptedException e) {
     e.printStackTrace();
 }
    }
}
非阻塞式写方法

当队列已满时写入数据,如果不想使得当前线程进入阻塞,那么就可以使用非阻塞式的写操作方法。

  • boolean add(E e):向队列尾部写入新的数据,当队列已满时不会进入阻塞,但是该方法会抛出队列已满的异常。
  • boolean offer(E e):向队列尾部写入新的数据,当队列已满时不会进入阻塞,并且会立即返回 false。

add() 方法示例

public class ArrayBlockingQueueExample2 {
    public static void main(String[] args) {
 ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
 queue.add("class 1");
 queue.add("class 2");
 queue.add("class 3");
 //  超过指定容量 抛出异常
 queue.add("class 4");
    }
}
// 抛出异常
阻塞式读方法
  • E take():从队列头部获取数据,并且该数据会从队列头部移除,当队列为空时执行take方法的线程将进入阻塞,直到有其他线程写入新的数据,或者当前线程被执行了中断操作。
  • E poll(long timeout, TimeUnit unit):从队列头部获取数据并且该数据会从队列头部移除,如果队列中没有任何元素时则执行该方法,当前线程会阻塞指定的时间,直到在此期间有新的数据写入,或者阻塞的当前线程被其他线程中断,当线程由于超时退出阻塞时,返回值为null。

take() 方法示例

public class ArrayBlockingQueueExample3 {
    public static void main(String[] args) {
 ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
 queue.add("class 1");
 queue.add("class 2");
 queue.add("class 3");
 try {
     // 取出对头元素
     System.out.println(queue.take());
 } catch (InterruptedException e) {
     e.printStackTrace();
 }
 // 队列大小 
 System.out.println(queue.size());
    }
}
//class 1
// 2
非阻塞式读方法
  • E poll():从队列头部获取数据并且该数据会从队列头部移除,当队列为空时,该方法不会使得当前线程进入阻塞,而是返回null值。
  • E peek():当队列为空时,该方法不会使得当前线程进入阻塞,而是返回null值。
public class ArrayBlockingQueueExample4 {
    public static void main(String[] args) {
 ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
 // 队列无元素 直接返回 null
 System.out.println(queue.poll( ));
 System.out.println(queue.peek( ));
    }
}
// null
// null
部分源码
    public void put(E e) throws InterruptedException {
 // 检查元素
 checkNotNull(e);
 final ReentrantLock lock = this.lock;
 // 获取锁
 lock.lockInterruptibly();
 try {
     // 元素满 一直阻塞,队列非满时,被唤醒
     while (count == items.length)
  notFull.await();
     // 入队
     enqueue(e);
 } finally {
     lock.unlock();
 }
    }

    public E take() throws InterruptedException {
 final ReentrantLock lock = this.lock;
  // 获取锁
 lock.lockInterruptibly();
 try {
     // 队列为空 等待
     while (count == 0)
  notEmpty.await();
     // 出队
     return dequeue();
 } finally {
     lock.unlock();
 }
    }
linkedBlockingQueue

linkedBlockingQueue 是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

public class PriorityBlockingQueueExample1 {
    public static void main(String[] args) {
 PriorityBlockingQueue queue = new PriorityBlockingQueue();
 queue.offer(1);
 queue.offer(12);
 queue.offer(21);
 queue.offer(6);
		// 内部排序
 System.out.println(queue.poll()); // 1
 System.out.println(queue.poll()); // 6
 System.out.println(queue.poll()); // 12
 System.out.println(queue.poll()); //21

    }
}
DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

DelayQueue队列的元素必须实现Delayed接口。我们可以参考ScheduledThreadPoolExecutor里ScheduledFutureTask类的实现。

public class DelayQueueExample1 {

    public static void main(String[] args) throws InterruptedException {
 DelayQueue queue = new DelayQueue<>();
 // 延期3秒 处理
 queue.put(new DelayedEntry("A", 30000L));
 // 延期10 秒处理
 queue.add(new DelayedEntry("B", 10000L));
 // 延期 20 秒处理
 queue.add(new DelayedEntry("C", 20000L));
 int size = queue.size();
 System.out.println("当前时间是:" + LocalDateTime.now());
 // 从延时队列中获取元素, 将输出 A,B,C
 for (int i = 0; i < size; i++) {
     System.out.println(queue.take() + " ------ " + LocalDateTime.now());
 }
    }
}


class DelayedEntry implements Delayed {
    
    private final String value;
    
    private final long exeTime;

    DelayedEntry(String value, long exeTime) {
 this.value = value;
 this.exeTime = exeTime + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
 return exeTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
 DelayedEntry t = (DelayedEntry) o;
 if (this.exeTime < t.exeTime) {
     return -1;
 } else if (this.exeTime > t.exeTime) {
     return 1;
 } else {
     return 0;
 }

    }

    @Override
    public String toString() {
 return "DelayedEntry{" +
  "value=" + value +
  ", exeTime=" + exeTime +
  '}';
    }
}

//当前时间是:2020-10-15T16:26:37.167
//DelayedEntry{value=B, exeTime=1602750407104} ------ 2020-10-15T16:26:47.117
// DelayedEntry{value=C, exeTime=1602750417104} ------ 2020-10-15T16:26:57.105
//DelayedEntry{value=A, exeTime=1602750427104} ------ 2020-10-15T16:27:07.104
SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列。

linkedTransferQueue

linkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,linkedTransferQueue多了tryTransfer和transfer方法。

  • transfer方法

    如果当前有消费者正在等待接收元素(消费者使用 take() 方法或带时间限制的poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer 方法的关键代码如下

  • tryTransfer方法

    tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
    对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回 true。

linkedBlockingDeque

linkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,linkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的 bug,使用时还是用带有First和Last后缀的方法更清楚。
在初始化linkedBlockingDeque 时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。


我是小春哥,从事 Java 后端开发,会一点前端、通过持续输出系列技术文章以文会友,如果本文能为您提供帮助,欢迎大家关注、点赞、分享支持,我们下期再见!

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号