栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Java多线程】JUC之并发容器—初识并发队列BlockingQueue(一)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Java多线程】JUC之并发容器—初识并发队列BlockingQueue(一)

文章目录

前言一.JUC

1.JUC结构图2.JUC并发容器3.JUC线程安全集合 二.什么是阻塞队列

1.在并发队列上JDK提供了两套实现2.阻塞队列与普通队列的区别3.有界和无界的区别4.ReentrantLock和Condition5.BlockingQueue接口6.主要操作7.如何选择合适的队列 三.初步使用阻塞队列

1.ArrayBlockingQueue(数组阻塞队列)2.linkedBlockingQueue(单向链表阻塞队列)3.PriorityBlockingQueue(优先级阻塞队列)4.DelayQueue(延迟阻塞队列)5.SynchronousQueue(同步阻塞队列)6.linkedTransferQueue(链表可转移阻塞队列)7.linkedBlockingDeque(双向链表阻塞队列)8.ConcurrentlinkedQueue(非阻塞单向链表队列)9.ConcurrentlinkedDeque(非阻塞双向链表队列)10.对比

前言

我的与多线程相关文章
并发编程最佳学习路线
【Java多线程】高并发修炼基础之高并发必须了解的概念
【Java多线程】了解线程的锁池和等待池概念
【Java多线程】了解Java锁机制
【Java多线程】线程通信

【Java基础】多线程从入门到掌握-第十五节.使用Concurrent集合
【Java多线程】JUC之线程池(一)与线程池的初识第四节.线程池的工作队列

一.JUC 1.JUC结构图

2.JUC并发容器

3.JUC线程安全集合

Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue。

接口线程不安全线程安全
ListArrayListCopyOnWriteArrayList
MapHashMapConcurrentHashMap
SetHashSet / TreeSetCopyOnWriteArraySet
QueueArrayDeque / linkedListArrayBlockingQueue / linkedBlockingQueue
DequeArrayDeque / linkedListlinkedBlockingDeque

使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:

Map map = ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");

因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:

Map map = HashMap<>();
//改为
Map map = ConcurrentHashMap<>();

java.util.Collections工具类还提供了一个旧的线程安全集合转换器处理List/Set/Map

语法为 Collections.synchronizedXXX(Collection c)

Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);

它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。 二.什么是阻塞队列

在Java中,BlockingQueue的接口位于java.util.concurrent 包中(在JDK1.5开始提供),由上面介绍的阻塞队列的特性可知,阻塞队列是线程安全的。 1.在并发队列上JDK提供了两套实现

一个是以ConcurrentlinkedQueue为代表的高性能非阻塞队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种顶级接口都继承自Queue。

BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、linkedBlockingDeque、PriorityBlockingQueue、DelayQueue、SynchronousQueue 、linkedTransferQueue、linkedBlockingQueue等,阻塞队列的区别体现在存储结构上 或 对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

队列汇总

ArrayDeque:数组双端队列,非阻塞队列PriorityQueue:优先级队列,非阻塞队列ArrayBlockingQueue:常用基于数组的FIFO有界阻塞队列linkedBlockingQueue:常用基于链表的FIFO可选有界阻塞队列PriorityBlockingQueue,常用 带优先级的FIFO无界阻塞队列,SynchronousQueue常用 并发同步阻塞队列DelayQueue:延期阻塞队列,阻塞队列实现了BlockingQueue接口linkedBlockingDeque:基于链表的FIFO双端阻塞队列ConcurrentlinkedQueue:基于链表节点的无锁非阻塞队列ConcurrentlinkedDeque:基于链表节点的无锁双端非阻塞队列 2.阻塞队列与普通队列的区别

2句话概括

在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。

取自文章【Java多线程】JUC之线程池(六)手写阻塞队列-第一节

3.有界和无界的区别

有界的意思是它的容量是有限的,即在初始化时必须指定它的容器大小,且容量大小一旦指定就不可改变。无界,即不指定容量大小,采用Integer.MAX_VALUE为的默认容量 4.ReentrantLock和Condition

ReentrantLock 称为可重入独占锁,详见文章【Java多线程】JUC之显示锁(Lock)与初识AQS(队列同步器)第三节

Condition称为等待条件,详见文章 【Java多线程】JUC之深入队列同步器(AQS)(二)ConditionObject源码解析 第二节

ReentrantLock主要方法:

lock():获得锁lockInterruptibly():获得锁,支持响应中断tryLock():尝试获得锁,成功返回true,否则false,该方法不等待,立即返回tryLock(long time,TimeUnit unit):在给定时间内尝试获得锁,支持响应中断unlock():释放锁Condition:通过ReentrantLock实例出来,await()、signal()方法分别对应之前的Object的wait()和notify()

Condition主要方法

await():当前线程进入等待状态,同时释放锁,支持响应中断awaitUninterruptibly():当前线程进入等待状态,同时释放锁,不支持响应中断signal():用于唤醒一个在等待的线程singalAll():用于唤醒所有在等待的线程 5.BlockingQueue接口

public interface BlockingQueue extends Queue {
	//-----------------写入start----------------------
    //写入元素至队尾,成功返回true, 如果队列已满,抛出IllegalStateException("Queue full")
    //如只往指定长度的队列中写入值,推荐使用offer()方法。
    boolean add(E e);

    //写入元素至队尾,成功返回true, 如果队列已满,返回false, e的值不能为空,否则抛出NullPointerException。
    boolean offer(E e);

	//写入元素至队尾, 如果队列已满, 则阻塞调用线程直到队列有空闲空间.
    void put(E e) throws InterruptedException;

    //写入元素至队尾, 如果队列已满, 则限时阻塞调用线程,直到队列有空闲空间或超时.
    boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;
	//-----------------写入end----------------------
	

	//-----------------读取start----------------------
    //从队首读取并移除元素,如果队列为空, 则阻塞调用线程直到队列中有元素写入.
    E take() throws InterruptedException;

    //从队首读取并移除元素,如果队列为空, 则限时阻塞调用线程,直到队列中有元素写入或超时.
    E poll(long timeout, TimeUnit unit)throws InterruptedException;
	//-----------------读取end----------------------



	//-----------------移除元素start----------------------
	//从队列中移除指定的值。若队列为空,抛出NoSuchElementException异常
    boolean remove(Object o);

    //将队列中值,全部移除,并发设置到给定的集合中。
    int drainTo(Collection c);

    //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
    int drainTo(Collection c, int maxElements);
	//-----------------移除元素end----------------------


    //获取队列中剩余的空间。
    int remainingCapacity();

    //判断队列中是否拥有该值。
    public boolean contains(Object o);
}
6.主要操作
如果队列满了(插入)或者为空(移除)抛出异常立即返回阻塞(响应中断)超时退出(响应中断)
写入boolean add(E e)boolean offer(e)*void put(E e) *boolean offer(E e, long timeout, TimeUnit unit)*
读取(移除)boolean remove(Object o)E poll() *E take() *E poll(long timeout, TimeUnit unit) *
检查E element()E peek()N/AN/A

异常:

当队列已满时往队列里写入元素,会抛出IllegalStateException("Queue full")异常当队列为空时从队列里读取元素时会抛出NoSuchElementException异常 。

立即返回:

写入元素会返回是否成功,成功则返回true,否则fallse。读取元素,如果有数据直接返回,如果没有则返回null

阻塞:

当队列已满时往队列里写入元素,会一直阻塞生产者线程,直到队列中有多余的空间,或者响应中断退出。当队列为空时,从队列里读取元素,会阻塞消费者线程,直到队列可用。

超时退出:

当队列已满往队列里写入元素,会阻塞生产者线程一定时间,写入超时则返回false,否则true当队列为空从队列里读取元素,会阻塞消费者线程一定时间,读取超时则返回false,否则true

!!!注意: remove(Object o)可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。

7.如何选择合适的队列
    边界空间吞吐量
三.初步使用阻塞队列 1.ArrayBlockingQueue(数组阻塞队列)

ArrayBlockingQueue:由数组实现的有界阻塞队列,在初始化时必须指定容器大小,按照FIFO的方式存储元素。内部使用ReentrantLock和Condition实现, 支持公平锁和非公平锁。

如果线程按顺序申请锁就是公平的,否则就是不公平的容量大小一旦设置就无法更改

部分源码

final Object[] items;
//唯一全局锁,:掌管所有访问操作的锁。全局共享。都会使用这个锁。
final ReentrantLock lock;
//两个等待队列

private final Condition notEmpty;

private final Condition notFull;

//写入元素方法
public void put(E e) throws InterruptedException {
     checkNotNull(e);
     final ReentrantLock lock = this.lock; // 唯一锁
     lock.lockInterruptibly();// 加锁
     try {
            while (count == items.length)
                notFull.await();//await 让出操作权
            enqueue(e);// 被唤醒就加入队列。
      } finally {
            lock.unlock();// 解锁
      }
}

//移除元素方法
public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock; // 加锁
     lock.lockInterruptibly();
     try {
       while (count == 0)//为空则释放当前锁
           notEmpty.await();
           return dequeue();// 获得锁被唤醒了则返回数据
      } finally {
            lock.unlock();// 释放锁
     }
}

使用方式

    @Test
    public void testArrayBlockingQueue() throws InterruptedException {
        ArrayBlockingQueue queue = new ArrayBlockingQueue(5);

        //生产者(添加元素)
        new Thread(() -> {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    queue.put(data);
                    System.out.println(Thread.currentThread().getName()+" put: " + data + "——size:" + queue.size());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //消费者1(取出元素)
        new Thread(() -> {
            while (true) {
                try {
                    String data = queue.take();
                    System.out.println(Thread.currentThread().getName() + " take(): " + data + "——size:" + queue.size());
                    Thread.sleep(1200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        //休眠100S防止主线程直接结束
        Thread.sleep(100000);
    }

执行结果

2.linkedBlockingQueue(单向链表阻塞队列)

linkedBlockingQueue:由单向链表结构组成的有界阻塞队列,队列容量大小可选,默认大小为Integer.MAX_VALUE。按照FIFO的方式对存储元素进行排序。,吞吐量通常要高于ArrayBlockingQuene

该队列内部计数用的原子类 AtomicIntegernewFixedThreadPool线程池使用了这个队列

部分源码

//节点类,用于存储数据
static class Node {
    E item;
    Node next;
    Node(E x) { item = x; }
}

// 阻塞队列的大小,默认为Integer.MAX_VALUE 
private final int capacity;
//当前阻塞队列中的元素个数 
private final AtomicInteger count = new AtomicInteger();
// 阻塞队列的头节点
transient Node head;
// 阻塞队列的尾节点
private transient Node last;
// 获取并移除元素时使用的锁,如take, poll
private final ReentrantLock takeLock = new ReentrantLock();
//  notEmpty条件对象,当队列没有数据时用于 挂起 执行删除的线程
private final Condition notEmpty = takeLock.newCondition();
// 添加元素时使用的锁如 put, offer
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件对象,当队列数据已满时用于 挂起 执行添加的线程
private final Condition notFull = putLock.newCondition();

写入到linkedBlockingQueue队列中的元素都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头节点和尾节点。与ArrayBlockingQueue不同的是,linkedBlockingQueue内部分别使用了takeLock和putLock 对并发进行控制,也就是说,写入和移除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。

使用方式

@Test
    public void testlinkedBlockingQueue() throws InterruptedException {
        linkedBlockingQueue queue = new linkedBlockingQueue(5);

        //生产者(添加元素)
        new Thread( () -> {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    queue.put(data);
                      System.out.println(Thread.currentThread().getName()+" put: " + data + "——size:" + queue.size());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //消费者1(取出元素)
        new Thread( () -> {
            while (true) {
                try {
                    String data = queue.take();
                    System.out.println(Thread.currentThread().getName() + " take: " + data+"——size:"+queue.size());
                    Thread.sleep(1200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        //休眠100S防止主线程直接结束
        Thread.sleep(100000);
    }

执行效果:

3.PriorityBlockingQueue(优先级阻塞队列)

PriorityBlockingQueue: 使用平衡二叉树堆实现的,支持按优先级排序的无界阻塞队列,写入的对象必须实现Comparable接口,或在队列构造方法传入比较器Comparator。默认按照自然顺序升序排序,缺点是不能保证同优先级元素的顺序。内部使用ReentrantLock和Condition实现

使用CAS自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行允许写入null元素二叉堆分类

最大堆:父节点的键值总是大于或者等于任何一个子节点的键值最小堆:父节点的键值总是小于或者等于任何一个子节点的键值

具体表现为:添加操作则是不断“上冒”,而删除操作则是不断“下掉”

部分源码

构造方法PriorityBlockingQueue(Collection c)分析,如下所示

public PriorityBlockingQueue(Collection c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true;     // true if not known to be in heap order
    boolean screen = true;      // true if must screen for nulls
 
    if (c instanceof SortedSet) {                        // 如果是有序集合
        SortedSet ss = (SortedSet) c;
        this.comparator = (Comparator) ss.comparator();
        heapify = false;
    } else if (c instanceof PriorityBlockingQueue) {     // 如果是优先级队列
        PriorityBlockingQueue pq = (PriorityBlockingQueue) c;
        this.comparator = (Comparator) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class)   // exact match
            heapify = false;
    }
 
    Object[] a = c.toArray();
    int n = a.length;
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {    // 校验是否存在null元素
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)    // 堆排序
        heapify();
}

插入元素 offer()方法分析

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
 
    final ReentrantLock lock = this.lock;   // 加锁
    lock.lock();
 
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))    // 队列已满, 则进行扩容
        tryGrow(array, cap);
 
    try {
        Comparator cmp = comparator;
        if (cmp == null)    // 比较器为空, 则按照元素的自然顺序进行堆调整
            siftUpComparable(n, e, array);
        else                // 比较器非空, 则按照比较器进行堆调整
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;       // 队列元素总数+1
        notEmpty.signal();  // 唤醒一个可能正在等待的"出队线程"
    } finally {
        lock.unlock();
    }
    return true;
}

上面最关键的是siftUpComparable() 和 siftUpUsingComparator()方法,这2个方法内部几乎一样,只不过前者是根据元素的自然顺序比较,后者则根据外部比较器比较,我们重点看下siftUpComparable()方法:

private static  void siftUpComparable(int k, T x, Object[] array) {
    Comparable key = (Comparable) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;     // 相当于(k-1)除2, 就是求k结点的父结点索引parent
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)  // 如果插入的结点值大于父结点, 则退出
            break;
 
        // 否则,交换父结点和当前结点的值
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

siftUpComparable()方法的作用其实就是堆的“上浮调整”,可以把堆可以想象成一棵完全二叉树,每次插入元素都链接到二叉树的最右下方,然后将插入的元素与其父结点比较,如果父结点大,则交换元素,直到没有父结点比插入的结点大为止。这样就保证了堆顶(二叉树的根结点)一定是最小的元素。(注:以上仅针对“小顶堆”)

扩容 tryGrow()方法

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock();  // 扩容和入队/出队可以同时进行, 所以先释放全局锁
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                    0, 1)) {    // allocationSpinLock置1表示正在扩容
        try {
            // 计算新的数组大小
            int newCap = oldCap + ((oldCap < 64) ?
                    (oldCap + 2) :
                    (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // 溢出判断
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];  // 分配新数组
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null)   // 扩容失败(可能有其它线程正在扩容,导致allocationSpinLock竞争失败)
        Thread.yield();
    
    lock.lock();            // 获取全局锁(因为要修改内部数组queue)
    if (newArray != null && queue == array) {
        queue = newArray;   // 指向新的内部数组
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

出队 take() 方法分析

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();   // 获取全局锁
    E result;
    try {
        while ((result = dequeue()) == null)    // 队列为空
            notEmpty.await();                   // 线程在noEmpty条件队列等待
    } finally {
        lock.unlock();
    }
    return result;
}
 
private E dequeue() {
    int n = size - 1;   // n表示出队后的剩余元素个数
    if (n < 0)          // 队列为空, 则返回null
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];    // array[0]是堆顶结点, 每次出队都删除堆顶结点
        E x = (E) array[n];         // array[n]是堆的最后一个结点, 也就是二叉树的最右下结点
        array[n] = null;
        Comparator cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

使用方式

    @Test
    public void testPriorityBlockingQueue() throws InterruptedException {
        //使用默认排序方式,即按自然顺序排序(即从小到大),可以通过元素实现Comparable接口或者构造时传入Comparator进行自定义排序
        PriorityBlockingQueue queue = new PriorityBlockingQueue(5);
        queue.put(6);
        queue.put(4);
        queue.put(3);
        queue.put(1);
        queue.put(2);
        queue.put(7);

        System.out.println(queue.poll());//1
        System.out.println(queue.poll());//2
    }

执行结果

4.DelayQueue(延迟阻塞队列)

DelayQueue: 一个内部使用优先级队列PriorityQueue实现延迟读取 的无界阻塞队列,该队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。内部使用ReentrantLock和Condition实现。

写入的元素的必须实现Delay接口,指定从队列中获取当前元素的时间。存储元素按到期时间排序的进行排序

该接口要求实现类须重写compareTo()方法,与getDelay()方法结合使用(下面有具体事例说明)

//
public interface Delayed extends Comparable {
    long getDelay(TimeUnit unit);
}

newScheduledThreadPool线程池内部使用了DelayQueue队列。应用场景:

1.缓存系统的设计:通过DelayQueue保存缓存元素的有效期,开一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了2.定时任务调度: 通过DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)

部分源码

插入元素offer()方法

public boolean offer(E e) {
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 元素加入优先级队列
        q.offer(e);
        // 获取优先级头元素,头元素等于当前元素
        // 清空leader,并放开读限制
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

出队方法 take(), 如果为空当前线程阻塞

public E take() throws InterruptedException {
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 自旋
        for (;;) {
            // 获取优先级队列头节点
            E first = q.peek();
            // 优先级队列为空
            if (first == null)
                // 阻塞
                available.await();
            else {
                // 判断头元素剩余时间是否小于等于0
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 优先级队列出队
                    return q.poll();
                // 到这,说明剩余时间大于0
                // 头引用置空
                first = null;
                // leader线程是否为空,不为空就等待
                if (leader != null)
                    available.await();
                else {
                    // 设置leader线程为当前线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 休眠剩余秒
                        available.awaitNanos(delay);
                    } finally {
                        // 休眠结束,leader线程还是当前线程
                        // 置空leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // leader线程为空,并且first不为空
        // 唤醒阻塞的leader,让它再去试一次
        if (leader == null && q.peek() != null)
            available.signal();
        // 解锁
        lock.unlock();
    }
}

使用方式

//DelayQueue保存的元素
public class Item implements Delayed {
    String name;

    //触发时间
    private long time;

    public Item(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        Item item = (Item) o;
        long diff = this.time - item.time;
        if (diff <= 0) {// 改成>=会造成问题
            return -1;
        } else {
            return 1;
        }
    }

    @Override
    public String toString() {
        return "Item{" +
                "time=" + time +
                ", name='" + name + ''' +
                '}';
    }

    @Test
    public void testDelayQueue() throws InterruptedException {
        Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
        Item item2 = new Item("item2", 10, TimeUnit.SECONDS);
        Item item3 = new Item("item3", 15, TimeUnit.SECONDS);

        DelayQueue queue = new DelayQueue<>();
        queue.put(item1);
        queue.put(item2);
        queue.put(item3);

        System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        for (int i = 0; i < 3; i++) {
            Item take = queue.take();
            System.out.format("name:{%s}, time:{%s}n", take.name, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
        }
        
    }
}

具体实例:一个订单系统,通过阻塞队列延时功能实现订单的延迟消费,需要(订单类,包装订单类,生产者,消费者,测试)

订单类

public class Order {
    //订单的编号
    private final String orderNo;
    //订单金额
    private final double orderMoney;

    public Order(String orderNo, double orderMoney) {
        super();
        this.orderNo = orderNo;
        this.orderMoney = orderMoney;
    }

    public String getOrderNo() {
        return orderNo;
    }

    public double getOrderMoney() {
        return orderMoney;
    }
}

包装类

// 类说明:存放到队列的元素
public class ItemVo implements Delayed {

    private long activeTime;//到期时间,单位毫秒
    private T object;

    //activeTime是个过期时长
    public ItemVo(long activeTime, T object) {
        super();
        this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, TimeUnit.MILLISECONDS) + System.nanoTime();
        // 将传入的时长转换为超时的时刻
        this.object = object;
    }

    public T getObject() {
        return object;
    }

    //按照剩余时间排序
    @Override
    public int compareTo(Delayed o) {
        long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        return (d == 0) ? 0 : ((d > 0) ? 1 : -1);
    }

    //返回元素的剩余时间
    @Override
    public long getDelay(TimeUnit unit) {
        long d = unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        return d;
    }
}

生产者

@Slf4j
public class ProducerOrder implements Runnable {
    private DelayQueue> queue;

    public ProducerOrder(DelayQueue> queue) {
        super();
        this.queue = queue;
    }

    @SneakyThrows
    @Override
    public void run() {
        while (true) {
            Random random = new Random();
            int num = random.nextInt(5000);//1-5000随机
            
            Order order = new Order("Order-" + num, num);
            ItemVo itemVo = new ItemVo(num, order);
            //插入
            queue.offer(itemVo);
            log.info("订单" + num + "ms后到期:" + order.getOrderNo());

            TimeUnit.MILLISECONDS.sleep(1500);
        }
    }
}

消费者

@Slf4j
public class CustomerOrder implements Runnable {
    private DelayQueue> queue;

    public FetchOrder(DelayQueue> queue) {
        super();
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                ItemVo item = queue.take();
                Order order = (Order) item.getObject();
                log.info("获取过期订单:" + order.getOrderNo());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

测试

public class DelayQueueTest {
    public static void main(String[] args) throws InterruptedException {
        //延迟队列
        DelayQueue> queue = new DelayQueue<>();
        //生产者
        new Thread(new ProducerOrder(queue)).start();
        //消费者
        new Thread(new CustomerOrder(queue)).start();

        //每隔1秒,打印个数字
        for (int i = 1; i < 1000; i++) {
            Thread.sleep(1000);
            System.out.println(i * 1000);
        }
    }
}

执行结果

5.SynchronousQueue(同步阻塞队列)

SynchronousQueue: 一个 不存储元素(没有容量) 的阻塞队列,,每个插入操作(put)必须等到另一个线程调用移除操作(take) (即: 写入元素必须被移除后才能继续写入新的元素),否则写入操作一直处于阻塞状。支持公平锁(TransferQueue-FIFO)和非公平锁(TransferStack-LIFO)。

应用场景:线程池newCachedThreadPool()就使用SynchronousQueue,这个线程池新任务到了如果有空闲线程则使用空闲线程执行(复用),没有就创建新线程,不会对任务进行缓存

常用于交换工作,生产者与消费者同步以传递某些信息、事件或任务

    @Test
    public void testSynchronousQueue() throws InterruptedException {
        SynchronousQueue queue = new SynchronousQueue();

        //生产者(添加元素)
        new Thread( () -> {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println(Thread.currentThread().getName()+" put: " + data + "——size:" + queue.size());
                    queue.put(data);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //消费者1(取出元素)
        new Thread( () -> {
            while (true) {
                try {
                    String data = queue.take();
                    System.out.println(Thread.currentThread().getName()+" take: " + data + "——size:" + queue.size());
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        Thread.sleep(100000);
    }

执行结果

6.linkedTransferQueue(链表可转移阻塞队列)

linkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相对于其它队列该类实现了TransferQueue接口,多了transfer()和tryTransfer()方法。

若当前存在一个正在等待transfer元素的消费者线程,就直接将元素 “交给” 等待者;否则会put当前元素到队尾,并进入阻塞状态,等待消费者线程transfer该元素。put()和 transfer()方法的区别:put() 是立即返回的, transfer() 是阻塞等待消费者拿到数据才返回。

思想: 采用预占模式。即: 消费者获取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队(put),然后消费者等待在这个节点上,后面生产者入队时(put)发现有一个元素为null的节点,生产者就不入队了(no put),直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素(take),从调用的方法返回。我们称这种节点操作为“匹配”方式。

简单来说就是:有就直接拿走,没有就占这这个位置直到拿到或者超时或者中断

并发编程—— linkedTransferQueue

public interface TransferQueue extends BlockingQueue {
    // 如果可能,立即将元素转移给等待的消费者。 
    // 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)pol)中,则 立即传送指定的元素,否则返回 false。
    //非阻塞
    boolean tryTransfer(E e);

    // 将元素转移给消费者,如果需要的话等待。 
    // 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)pol)中,则立即传送指定的元素,否则 等待 直到 元素由消费者接收。
    //支持响应中断
    void transfer(E e) throws InterruptedException;

    // 上面方法的基础上设置超时时间,支持响应中断
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    // 如果至少有一位消费者在等待,则返回 true
    boolean hasWaitingConsumer();

    // 返回等待消费者人数的估计值
    int getWaitingConsumerCount();
}
7.linkedBlockingDeque(双向链表阻塞队列)

linkedBlockingDeque: 由双向链表组成的有界阻塞队列,队列容量大小可选,默认大小为Integer.MAX_VALUE。队头部和队尾都可以写入和移除元素,因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半锁的竞争

Deque特性: 队头和队尾都可以插入和移除元素相比于其他阻塞队列,linkedBlockingDeque多了addFirst()、addLast()、peekFirst()、peekLast()等方法,以XXXFirst结尾的方法,表示插入、获取获移除双端队列的队头元素。以xxxLast结尾的方法,表示插入、获取获移除双端队列的队尾元素。场景: 常用于工作窃取模式

使用方式

    @Test
    public void testlinkedBlockingDeque (){
        BlockingDeque blockingDeque = new linkedBlockingDeque<>(1);
        // offer,poll 线程安全/阻塞 api
        blockingDeque.offer("添加第一个元素");
        String item = blockingDeque.poll();
        System.out.println("poll item:" + item);

        // offer,poll 线程安全/如果失败抛出异常
        try {
            blockingDeque.put("添加第二个元素");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        try {
            String take = blockingDeque.take();
            System.out.println("take item:" + take);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // add,remove 不是线程安全的
        blockingDeque.add("添加第四个元素");
        blockingDeque.add("添加第五个元素");
        item = blockingDeque.remove();
        System.out.println(item);
    }

执行结果

8.ConcurrentlinkedQueue(非阻塞单向链表队列)

ConcurrentlinkedQueue: 一个适用于高并发场景下的队列,基于链接节点Node的无界非阻塞线程安全队列,使用CAS+volatile来实现线程安全(不具备阻塞功能)。按照FIFO的方式对存储元素进行排序,该队列不允许写入null元素。

通常ConcurrentlinkedQueue性能好于BlockingQueue.

使用方式

        // 非阻塞式队列,无界队列
        ConcurrentlinkedQueue queue = new ConcurrentlinkedQueue<>();
        queue.offer("张三");
        queue.offer("李四");
        queue.offer("王五");
        
        //从头获取元素,删除该元素
        System.out.println(queue.poll());
        //从头获取元素,不刪除该元素
        System.out.println(queue.peek());
        //获取容量大小
        System.out.println(queue.size());
9.ConcurrentlinkedDeque(非阻塞双向链表队列)

ConcurrentlinkedDeque: 基于双向链表实现的无界非阻塞线程安全队列,实现方式类似 ConcurrentlinkedQueue。

10.对比
阻塞锁数据结构是否有界线程安全适用场景
ArrayBlockingQueue数组有界,大小确认后不支持改变一把ReentrantLock锁控制put和take生产消费模型,平衡处理速度
linkedBlockingQueue单向链表可配置两把ReentrantLock锁,put和take可以并发执行。生产消费模型,平衡处理速度
linkedBlockingDeque双向链表可配置一把ReentrantLock锁控制put和take。生产消费模型,平衡处理速度
优先级队列 PriorityBlockingQueue二叉小顶堆无界,会自动扩容一把ReentrantLock锁控制put和take,队列为空的时候take进入条件等待;短信队列中的验证码短信优先发送
同步队列 SynchronousQueue单向链表或者栈容量为1CAS,put和take都会阻塞,直到配对成功为止线程之间传递数据
延迟队列 DelayQueuePriorityQueue,二叉小顶堆无界,会自动扩容一把ReentrantLock锁控制put和take,一次只能一个线程take,其他线程进入条件等待;关闭超时空连接,任务超时处理…
linkedTransferQueue单向链表无界CAS预占模式: 有就直接拿走,没有就占这这个位置直到拿到或者超时或者中断
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760573.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号