前言一.JUC
1.JUC结构图2.JUC并发容器3.JUC线程安全集合 二.什么是阻塞队列
1.在并发队列上JDK提供了两套实现2.阻塞队列与普通队列的区别3.有界和无界的区别4.ReentrantLock和Condition5.BlockingQueue接口6.主要操作7.如何选择合适的队列 三.初步使用阻塞队列
1.ArrayBlockingQueue(数组阻塞队列)2.linkedBlockingQueue(单向链表阻塞队列)3.PriorityBlockingQueue(优先级阻塞队列)4.DelayQueue(延迟阻塞队列)5.SynchronousQueue(同步阻塞队列)6.linkedTransferQueue(链表可转移阻塞队列)7.linkedBlockingDeque(双向链表阻塞队列)8.ConcurrentlinkedQueue(非阻塞单向链表队列)9.ConcurrentlinkedDeque(非阻塞双向链表队列)10.对比
前言我的与多线程相关文章
并发编程最佳学习路线
【Java多线程】高并发修炼基础之高并发必须了解的概念
【Java多线程】了解线程的锁池和等待池概念
【Java多线程】了解Java锁机制
【Java多线程】线程通信
【Java基础】多线程从入门到掌握-第十五节.使用Concurrent集合
【Java多线程】JUC之线程池(一)与线程池的初识第四节.线程池的工作队列
Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue。
| 接口 | 线程不安全 | 线程安全 |
|---|---|---|
| List | ArrayList | CopyOnWriteArrayList |
| Map | HashMap | ConcurrentHashMap |
| Set | HashSet / TreeSet | CopyOnWriteArraySet |
| Queue | ArrayDeque / linkedList | ArrayBlockingQueue / linkedBlockingQueue |
| Deque | ArrayDeque / linkedList | linkedBlockingDeque |
使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:
Mapmap = ConcurrentHashMap<>(); // 在不同的线程读写: map.put("A", "1"); map.put("B", "2"); map.get("A", "1");
因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:
Mapmap = HashMap<>(); //改为 Map map = ConcurrentHashMap<>();
java.util.Collections工具类还提供了一个旧的线程安全集合转换器处理List/Set/Map
语法为 Collections.synchronizedXXX(Collection c)
Map unsafeMap = new HashMap(); Map threadSafeMap = Collections.synchronizedMap(unsafeMap);
它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。 二.什么是阻塞队列
在Java中,BlockingQueue的接口位于java.util.concurrent 包中(在JDK1.5开始提供),由上面介绍的阻塞队列的特性可知,阻塞队列是线程安全的。 1.在并发队列上JDK提供了两套实现
一个是以ConcurrentlinkedQueue为代表的高性能非阻塞队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种顶级接口都继承自Queue。
BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、linkedBlockingDeque、PriorityBlockingQueue、DelayQueue、SynchronousQueue 、linkedTransferQueue、linkedBlockingQueue等,阻塞队列的区别体现在存储结构上 或 对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
队列汇总
ArrayDeque:数组双端队列,非阻塞队列PriorityQueue:优先级队列,非阻塞队列ArrayBlockingQueue:常用基于数组的FIFO有界阻塞队列linkedBlockingQueue:常用基于链表的FIFO可选有界阻塞队列PriorityBlockingQueue,常用 带优先级的FIFO无界阻塞队列,SynchronousQueue常用 并发同步阻塞队列DelayQueue:延期阻塞队列,阻塞队列实现了BlockingQueue接口linkedBlockingDeque:基于链表的FIFO双端阻塞队列ConcurrentlinkedQueue:基于链表节点的无锁非阻塞队列ConcurrentlinkedDeque:基于链表节点的无锁双端非阻塞队列 2.阻塞队列与普通队列的区别
2句话概括
在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
3.有界和无界的区别取自文章【Java多线程】JUC之线程池(六)手写阻塞队列-第一节
有界的意思是它的容量是有限的,即在初始化时必须指定它的容器大小,且容量大小一旦指定就不可改变。无界,即不指定容量大小,采用Integer.MAX_VALUE为的默认容量 4.ReentrantLock和Condition
ReentrantLock 称为可重入独占锁,详见文章【Java多线程】JUC之显示锁(Lock)与初识AQS(队列同步器)第三节
Condition称为等待条件,详见文章 【Java多线程】JUC之深入队列同步器(AQS)(二)ConditionObject源码解析 第二节
ReentrantLock主要方法:
lock():获得锁lockInterruptibly():获得锁,支持响应中断tryLock():尝试获得锁,成功返回true,否则false,该方法不等待,立即返回tryLock(long time,TimeUnit unit):在给定时间内尝试获得锁,支持响应中断unlock():释放锁Condition:通过ReentrantLock实例出来,await()、signal()方法分别对应之前的Object的wait()和notify()
Condition主要方法
await():当前线程进入等待状态,同时释放锁,支持响应中断awaitUninterruptibly():当前线程进入等待状态,同时释放锁,不支持响应中断signal():用于唤醒一个在等待的线程singalAll():用于唤醒所有在等待的线程 5.BlockingQueue接口
public interface BlockingQueue6.主要操作extends Queue { //-----------------写入start---------------------- //写入元素至队尾,成功返回true, 如果队列已满,抛出IllegalStateException("Queue full") //如只往指定长度的队列中写入值,推荐使用offer()方法。 boolean add(E e); //写入元素至队尾,成功返回true, 如果队列已满,返回false, e的值不能为空,否则抛出NullPointerException。 boolean offer(E e); //写入元素至队尾, 如果队列已满, 则阻塞调用线程直到队列有空闲空间. void put(E e) throws InterruptedException; //写入元素至队尾, 如果队列已满, 则限时阻塞调用线程,直到队列有空闲空间或超时. boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException; //-----------------写入end---------------------- //-----------------读取start---------------------- //从队首读取并移除元素,如果队列为空, 则阻塞调用线程直到队列中有元素写入. E take() throws InterruptedException; //从队首读取并移除元素,如果队列为空, 则限时阻塞调用线程,直到队列中有元素写入或超时. E poll(long timeout, TimeUnit unit)throws InterruptedException; //-----------------读取end---------------------- //-----------------移除元素start---------------------- //从队列中移除指定的值。若队列为空,抛出NoSuchElementException异常 boolean remove(Object o); //将队列中值,全部移除,并发设置到给定的集合中。 int drainTo(Collection super E> c); //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。 int drainTo(Collection super E> c, int maxElements); //-----------------移除元素end---------------------- //获取队列中剩余的空间。 int remainingCapacity(); //判断队列中是否拥有该值。 public boolean contains(Object o); }
| 如果队列满了(插入)或者为空(移除) | 抛出异常 | 立即返回 | 阻塞(响应中断) | 超时退出(响应中断) |
|---|---|---|---|---|
| 写入 | boolean add(E e) | boolean offer(e)* | void put(E e) * | boolean offer(E e, long timeout, TimeUnit unit)* |
| 读取(移除) | boolean remove(Object o) | E poll() * | E take() * | E poll(long timeout, TimeUnit unit) * |
| 检查 | E element() | E peek() | N/A | N/A |
异常:
当队列已满时往队列里写入元素,会抛出IllegalStateException("Queue full")异常当队列为空时从队列里读取元素时会抛出NoSuchElementException异常 。
立即返回:
写入元素会返回是否成功,成功则返回true,否则fallse。读取元素,如果有数据直接返回,如果没有则返回null
阻塞:
当队列已满时往队列里写入元素,会一直阻塞生产者线程,直到队列中有多余的空间,或者响应中断退出。当队列为空时,从队列里读取元素,会阻塞消费者线程,直到队列可用。
超时退出:
当队列已满往队列里写入元素,会阻塞生产者线程一定时间,写入超时则返回false,否则true当队列为空从队列里读取元素,会阻塞消费者线程一定时间,读取超时则返回false,否则true
7.如何选择合适的队列!!!注意: remove(Object o)可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。
- 边界空间吞吐量
ArrayBlockingQueue:由数组实现的有界阻塞队列,在初始化时必须指定容器大小,按照FIFO的方式存储元素。内部使用ReentrantLock和Condition实现, 支持公平锁和非公平锁。
如果线程按顺序申请锁就是公平的,否则就是不公平的容量大小一旦设置就无法更改
部分源码
final Object[] items;
//唯一全局锁,:掌管所有访问操作的锁。全局共享。都会使用这个锁。
final ReentrantLock lock;
//两个等待队列
private final Condition notEmpty;
private final Condition notFull;
//写入元素方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; // 唯一锁
lock.lockInterruptibly();// 加锁
try {
while (count == items.length)
notFull.await();//await 让出操作权
enqueue(e);// 被唤醒就加入队列。
} finally {
lock.unlock();// 解锁
}
}
//移除元素方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; // 加锁
lock.lockInterruptibly();
try {
while (count == 0)//为空则释放当前锁
notEmpty.await();
return dequeue();// 获得锁被唤醒了则返回数据
} finally {
lock.unlock();// 释放锁
}
}
使用方式
@Test
public void testArrayBlockingQueue() throws InterruptedException {
ArrayBlockingQueue queue = new ArrayBlockingQueue(5);
//生产者(添加元素)
new Thread(() -> {
while (true) {
try {
String data = UUID.randomUUID().toString();
queue.put(data);
System.out.println(Thread.currentThread().getName()+" put: " + data + "——size:" + queue.size());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//消费者1(取出元素)
new Thread(() -> {
while (true) {
try {
String data = queue.take();
System.out.println(Thread.currentThread().getName() + " take(): " + data + "——size:" + queue.size());
Thread.sleep(1200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//休眠100S防止主线程直接结束
Thread.sleep(100000);
}
执行结果
linkedBlockingQueue:由单向链表结构组成的有界阻塞队列,队列容量大小可选,默认大小为Integer.MAX_VALUE。按照FIFO的方式对存储元素进行排序。,吞吐量通常要高于ArrayBlockingQuene
该队列内部计数用的原子类 AtomicIntegernewFixedThreadPool线程池使用了这个队列
部分源码
//节点类,用于存储数据 static class Node{ E item; Node next; Node(E x) { item = x; } } // 阻塞队列的大小,默认为Integer.MAX_VALUE private final int capacity; //当前阻塞队列中的元素个数 private final AtomicInteger count = new AtomicInteger(); // 阻塞队列的头节点 transient Node head; // 阻塞队列的尾节点 private transient Node last; // 获取并移除元素时使用的锁,如take, poll private final ReentrantLock takeLock = new ReentrantLock(); // notEmpty条件对象,当队列没有数据时用于 挂起 执行删除的线程 private final Condition notEmpty = takeLock.newCondition(); // 添加元素时使用的锁如 put, offer private final ReentrantLock putLock = new ReentrantLock(); // notFull条件对象,当队列数据已满时用于 挂起 执行添加的线程 private final Condition notFull = putLock.newCondition();
写入到linkedBlockingQueue队列中的元素都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头节点和尾节点。与ArrayBlockingQueue不同的是,linkedBlockingQueue内部分别使用了takeLock和putLock 对并发进行控制,也就是说,写入和移除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。
使用方式
@Test
public void testlinkedBlockingQueue() throws InterruptedException {
linkedBlockingQueue queue = new linkedBlockingQueue(5);
//生产者(添加元素)
new Thread( () -> {
while (true) {
try {
String data = UUID.randomUUID().toString();
queue.put(data);
System.out.println(Thread.currentThread().getName()+" put: " + data + "——size:" + queue.size());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//消费者1(取出元素)
new Thread( () -> {
while (true) {
try {
String data = queue.take();
System.out.println(Thread.currentThread().getName() + " take: " + data+"——size:"+queue.size());
Thread.sleep(1200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//休眠100S防止主线程直接结束
Thread.sleep(100000);
}
执行效果:
PriorityBlockingQueue: 使用平衡二叉树堆实现的,支持按优先级排序的无界阻塞队列,写入的对象必须实现Comparable接口,或在队列构造方法传入比较器Comparator。默认按照自然顺序升序排序,缺点是不能保证同优先级元素的顺序。内部使用ReentrantLock和Condition实现
使用CAS自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行允许写入null元素二叉堆分类
最大堆:父节点的键值总是大于或者等于任何一个子节点的键值最小堆:父节点的键值总是小于或者等于任何一个子节点的键值
具体表现为:添加操作则是不断“上冒”,而删除操作则是不断“下掉”
部分源码
构造方法PriorityBlockingQueue(Collection extends E> c)分析,如下所示
public PriorityBlockingQueue(Collection extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet>) { // 如果是有序集合
SortedSet extends E> ss = (SortedSet extends E>) c;
this.comparator = (Comparator super E>) ss.comparator();
heapify = false;
} else if (c instanceof PriorityBlockingQueue>) { // 如果是优先级队列
PriorityBlockingQueue extends E> pq = (PriorityBlockingQueue extends E>) c;
this.comparator = (Comparator super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) { // 校验是否存在null元素
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify) // 堆排序
heapify();
}
插入元素 offer()方法分析
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock; // 加锁
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length)) // 队列已满, 则进行扩容
tryGrow(array, cap);
try {
Comparator super E> cmp = comparator;
if (cmp == null) // 比较器为空, 则按照元素的自然顺序进行堆调整
siftUpComparable(n, e, array);
else // 比较器非空, 则按照比较器进行堆调整
siftUpUsingComparator(n, e, array, cmp);
size = n + 1; // 队列元素总数+1
notEmpty.signal(); // 唤醒一个可能正在等待的"出队线程"
} finally {
lock.unlock();
}
return true;
}
上面最关键的是siftUpComparable() 和 siftUpUsingComparator()方法,这2个方法内部几乎一样,只不过前者是根据元素的自然顺序比较,后者则根据外部比较器比较,我们重点看下siftUpComparable()方法:
private staticvoid siftUpComparable(int k, T x, Object[] array) { Comparable super T> key = (Comparable super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; // 相当于(k-1)除2, 就是求k结点的父结点索引parent Object e = array[parent]; if (key.compareTo((T) e) >= 0) // 如果插入的结点值大于父结点, 则退出 break; // 否则,交换父结点和当前结点的值 array[k] = e; k = parent; } array[k] = key; }
siftUpComparable()方法的作用其实就是堆的“上浮调整”,可以把堆可以想象成一棵完全二叉树,每次插入元素都链接到二叉树的最右下方,然后将插入的元素与其父结点比较,如果父结点大,则交换元素,直到没有父结点比插入的结点大为止。这样就保证了堆顶(二叉树的根结点)一定是最小的元素。(注:以上仅针对“小顶堆”)
扩容 tryGrow()方法
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // 扩容和入队/出队可以同时进行, 所以先释放全局锁
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) { // allocationSpinLock置1表示正在扩容
try {
// 计算新的数组大小
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) :
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // 溢出判断
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap]; // 分配新数组
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // 扩容失败(可能有其它线程正在扩容,导致allocationSpinLock竞争失败)
Thread.yield();
lock.lock(); // 获取全局锁(因为要修改内部数组queue)
if (newArray != null && queue == array) {
queue = newArray; // 指向新的内部数组
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
出队 take() 方法分析
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取全局锁
E result;
try {
while ((result = dequeue()) == null) // 队列为空
notEmpty.await(); // 线程在noEmpty条件队列等待
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1; // n表示出队后的剩余元素个数
if (n < 0) // 队列为空, 则返回null
return null;
else {
Object[] array = queue;
E result = (E) array[0]; // array[0]是堆顶结点, 每次出队都删除堆顶结点
E x = (E) array[n]; // array[n]是堆的最后一个结点, 也就是二叉树的最右下结点
array[n] = null;
Comparator super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
使用方式
@Test
public void testPriorityBlockingQueue() throws InterruptedException {
//使用默认排序方式,即按自然顺序排序(即从小到大),可以通过元素实现Comparable接口或者构造时传入Comparator进行自定义排序
PriorityBlockingQueue queue = new PriorityBlockingQueue(5);
queue.put(6);
queue.put(4);
queue.put(3);
queue.put(1);
queue.put(2);
queue.put(7);
System.out.println(queue.poll());//1
System.out.println(queue.poll());//2
}
执行结果
DelayQueue: 一个内部使用优先级队列PriorityQueue实现延迟读取 的无界阻塞队列,该队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。内部使用ReentrantLock和Condition实现。
写入的元素的必须实现Delay接口,指定从队列中获取当前元素的时间。存储元素按到期时间排序的进行排序
该接口要求实现类须重写compareTo()方法,与getDelay()方法结合使用(下面有具体事例说明)
// public interface Delayed extends Comparable{ long getDelay(TimeUnit unit); }
newScheduledThreadPool线程池内部使用了DelayQueue队列。应用场景:
1.缓存系统的设计:通过DelayQueue保存缓存元素的有效期,开一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了2.定时任务调度: 通过DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
部分源码
插入元素offer()方法
public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 元素加入优先级队列
q.offer(e);
// 获取优先级头元素,头元素等于当前元素
// 清空leader,并放开读限制
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
// 释放锁
lock.unlock();
}
}
出队方法 take(), 如果为空当前线程阻塞
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 获取优先级队列头节点
E first = q.peek();
// 优先级队列为空
if (first == null)
// 阻塞
available.await();
else {
// 判断头元素剩余时间是否小于等于0
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 优先级队列出队
return q.poll();
// 到这,说明剩余时间大于0
// 头引用置空
first = null;
// leader线程是否为空,不为空就等待
if (leader != null)
available.await();
else {
// 设置leader线程为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 休眠剩余秒
available.awaitNanos(delay);
} finally {
// 休眠结束,leader线程还是当前线程
// 置空leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader线程为空,并且first不为空
// 唤醒阻塞的leader,让它再去试一次
if (leader == null && q.peek() != null)
available.signal();
// 解锁
lock.unlock();
}
}
使用方式
//DelayQueue保存的元素
public class Item implements Delayed {
String name;
//触发时间
private long time;
public Item(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Item item = (Item) o;
long diff = this.time - item.time;
if (diff <= 0) {// 改成>=会造成问题
return -1;
} else {
return 1;
}
}
@Override
public String toString() {
return "Item{" +
"time=" + time +
", name='" + name + ''' +
'}';
}
@Test
public void testDelayQueue() throws InterruptedException {
Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
Item item2 = new Item("item2", 10, TimeUnit.SECONDS);
Item item3 = new Item("item3", 15, TimeUnit.SECONDS);
DelayQueue- queue = new DelayQueue<>();
queue.put(item1);
queue.put(item2);
queue.put(item3);
System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
for (int i = 0; i < 3; i++) {
Item take = queue.take();
System.out.format("name:{%s}, time:{%s}n", take.name, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
}
}
}
具体实例:一个订单系统,通过阻塞队列延时功能实现订单的延迟消费,需要(订单类,包装订单类,生产者,消费者,测试)
订单类
public class Order {
//订单的编号
private final String orderNo;
//订单金额
private final double orderMoney;
public Order(String orderNo, double orderMoney) {
super();
this.orderNo = orderNo;
this.orderMoney = orderMoney;
}
public String getOrderNo() {
return orderNo;
}
public double getOrderMoney() {
return orderMoney;
}
}
包装类
// 类说明:存放到队列的元素 public class ItemVoimplements Delayed { private long activeTime;//到期时间,单位毫秒 private T object; //activeTime是个过期时长 public ItemVo(long activeTime, T object) { super(); this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, TimeUnit.MILLISECONDS) + System.nanoTime(); // 将传入的时长转换为超时的时刻 this.object = object; } public T getObject() { return object; } //按照剩余时间排序 @Override public int compareTo(Delayed o) { long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (d == 0) ? 0 : ((d > 0) ? 1 : -1); } //返回元素的剩余时间 @Override public long getDelay(TimeUnit unit) { long d = unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS); return d; } }
生产者
@Slf4j
public class ProducerOrder implements Runnable {
private DelayQueue> queue;
public ProducerOrder(DelayQueue> queue) {
super();
this.queue = queue;
}
@SneakyThrows
@Override
public void run() {
while (true) {
Random random = new Random();
int num = random.nextInt(5000);//1-5000随机
Order order = new Order("Order-" + num, num);
ItemVo itemVo = new ItemVo(num, order);
//插入
queue.offer(itemVo);
log.info("订单" + num + "ms后到期:" + order.getOrderNo());
TimeUnit.MILLISECONDS.sleep(1500);
}
}
}
消费者
@Slf4j
public class CustomerOrder implements Runnable {
private DelayQueue> queue;
public FetchOrder(DelayQueue> queue) {
super();
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
ItemVo item = queue.take();
Order order = (Order) item.getObject();
log.info("获取过期订单:" + order.getOrderNo());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
测试
public class DelayQueueTest {
public static void main(String[] args) throws InterruptedException {
//延迟队列
DelayQueue> queue = new DelayQueue<>();
//生产者
new Thread(new ProducerOrder(queue)).start();
//消费者
new Thread(new CustomerOrder(queue)).start();
//每隔1秒,打印个数字
for (int i = 1; i < 1000; i++) {
Thread.sleep(1000);
System.out.println(i * 1000);
}
}
}
执行结果
5.SynchronousQueue(同步阻塞队列)SynchronousQueue: 一个 不存储元素(没有容量) 的阻塞队列,,每个插入操作(put)必须等到另一个线程调用移除操作(take) (即: 写入元素必须被移除后才能继续写入新的元素),否则写入操作一直处于阻塞状。支持公平锁(TransferQueue-FIFO)和非公平锁(TransferStack-LIFO)。
应用场景:线程池newCachedThreadPool()就使用SynchronousQueue,这个线程池新任务到了如果有空闲线程则使用空闲线程执行(复用),没有就创建新线程,不会对任务进行缓存
常用于交换工作,生产者与消费者同步以传递某些信息、事件或任务
@Test
public void testSynchronousQueue() throws InterruptedException {
SynchronousQueue queue = new SynchronousQueue();
//生产者(添加元素)
new Thread( () -> {
while (true) {
try {
String data = UUID.randomUUID().toString();
System.out.println(Thread.currentThread().getName()+" put: " + data + "——size:" + queue.size());
queue.put(data);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//消费者1(取出元素)
new Thread( () -> {
while (true) {
try {
String data = queue.take();
System.out.println(Thread.currentThread().getName()+" take: " + data + "——size:" + queue.size());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(100000);
}
执行结果
linkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相对于其它队列该类实现了TransferQueue接口,多了transfer()和tryTransfer()方法。
若当前存在一个正在等待transfer元素的消费者线程,就直接将元素 “交给” 等待者;否则会put当前元素到队尾,并进入阻塞状态,等待消费者线程transfer该元素。put()和 transfer()方法的区别:put() 是立即返回的, transfer() 是阻塞等待消费者拿到数据才返回。
思想: 采用预占模式。即: 消费者获取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队(put),然后消费者等待在这个节点上,后面生产者入队时(put)发现有一个元素为null的节点,生产者就不入队了(no put),直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素(take),从调用的方法返回。我们称这种节点操作为“匹配”方式。
简单来说就是:有就直接拿走,没有就占这这个位置直到拿到或者超时或者中断
并发编程—— linkedTransferQueue
public interface TransferQueue7.linkedBlockingDeque(双向链表阻塞队列)extends BlockingQueue { // 如果可能,立即将元素转移给等待的消费者。 // 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)pol)中,则 立即传送指定的元素,否则返回 false。 //非阻塞 boolean tryTransfer(E e); // 将元素转移给消费者,如果需要的话等待。 // 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)pol)中,则立即传送指定的元素,否则 等待 直到 元素由消费者接收。 //支持响应中断 void transfer(E e) throws InterruptedException; // 上面方法的基础上设置超时时间,支持响应中断 boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 如果至少有一位消费者在等待,则返回 true boolean hasWaitingConsumer(); // 返回等待消费者人数的估计值 int getWaitingConsumerCount(); }
linkedBlockingDeque: 由双向链表组成的有界阻塞队列,队列容量大小可选,默认大小为Integer.MAX_VALUE。队头部和队尾都可以写入和移除元素,因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半锁的竞争
Deque特性: 队头和队尾都可以插入和移除元素相比于其他阻塞队列,linkedBlockingDeque多了addFirst()、addLast()、peekFirst()、peekLast()等方法,以XXXFirst结尾的方法,表示插入、获取获移除双端队列的队头元素。以xxxLast结尾的方法,表示插入、获取获移除双端队列的队尾元素。场景: 常用于工作窃取模式
使用方式
@Test
public void testlinkedBlockingDeque (){
BlockingDeque blockingDeque = new linkedBlockingDeque<>(1);
// offer,poll 线程安全/阻塞 api
blockingDeque.offer("添加第一个元素");
String item = blockingDeque.poll();
System.out.println("poll item:" + item);
// offer,poll 线程安全/如果失败抛出异常
try {
blockingDeque.put("添加第二个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
String take = blockingDeque.take();
System.out.println("take item:" + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
// add,remove 不是线程安全的
blockingDeque.add("添加第四个元素");
blockingDeque.add("添加第五个元素");
item = blockingDeque.remove();
System.out.println(item);
}
执行结果
ConcurrentlinkedQueue: 一个适用于高并发场景下的队列,基于链接节点Node的无界非阻塞线程安全队列,使用CAS+volatile来实现线程安全(不具备阻塞功能)。按照FIFO的方式对存储元素进行排序,该队列不允许写入null元素。
通常ConcurrentlinkedQueue性能好于BlockingQueue.
使用方式
// 非阻塞式队列,无界队列
ConcurrentlinkedQueue queue = new ConcurrentlinkedQueue<>();
queue.offer("张三");
queue.offer("李四");
queue.offer("王五");
//从头获取元素,删除该元素
System.out.println(queue.poll());
//从头获取元素,不刪除该元素
System.out.println(queue.peek());
//获取容量大小
System.out.println(queue.size());
9.ConcurrentlinkedDeque(非阻塞双向链表队列)
ConcurrentlinkedDeque: 基于双向链表实现的无界非阻塞线程安全队列,实现方式类似 ConcurrentlinkedQueue。
10.对比| 阻塞锁 | 数据结构 | 是否有界 | 线程安全 | 适用场景 |
|---|---|---|---|---|
| ArrayBlockingQueue | 数组 | 有界,大小确认后不支持改变 | 一把ReentrantLock锁控制put和take | 生产消费模型,平衡处理速度 |
| linkedBlockingQueue | 单向链表 | 可配置 | 两把ReentrantLock锁,put和take可以并发执行。 | 生产消费模型,平衡处理速度 |
| linkedBlockingDeque | 双向链表 | 可配置 | 一把ReentrantLock锁控制put和take。 | 生产消费模型,平衡处理速度 |
| 优先级队列 PriorityBlockingQueue | 二叉小顶堆 | 无界,会自动扩容 | 一把ReentrantLock锁控制put和take,队列为空的时候take进入条件等待; | 短信队列中的验证码短信优先发送 |
| 同步队列 SynchronousQueue | 单向链表或者栈 | 容量为1 | CAS,put和take都会阻塞,直到配对成功为止 | 线程之间传递数据 |
| 延迟队列 DelayQueue | PriorityQueue,二叉小顶堆 | 无界,会自动扩容 | 一把ReentrantLock锁控制put和take,一次只能一个线程take,其他线程进入条件等待; | 关闭超时空连接,任务超时处理… |
| linkedTransferQueue | 单向链表 | 无界 | CAS | 预占模式: 有就直接拿走,没有就占这这个位置直到拿到或者超时或者中断 |



