为了应对高并发过程中,数据一致性的问题,java中设计了一些并发容器。本篇将从容器的类别出发,介绍各个容器的特点及原理,为接下来的线程池做准备。
List相关 VectorVector是最古老的并发容器,其实现了List接口,方法都是默认加synchronized的,所以效率很低,现在基本不会用它。
示例用到的锁 synchronized
public synchronized int lastIndexOf(Object o) {
return lastIndexOf(o, elementCount-1);
}
CopyOnWriteArrayList
读时没影响不加锁,写时加锁,copy一个新list,然后扩展一个新元素,老引用指向新的。
示例用到的锁 synchronized
// 写时加锁
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
常见同步方法
Set相关
CopyOnWriteArraySet
底层是数组,读时没影响不加锁,写时加锁,copy一个新list,然后扩展一个新元素,老引用指向新的。
ConcurrentSkipListSetConcurrentSkipListSet是线程安全的有序的哈希表,其底层用的是跳表,其特点主要在有序上面。
示例用到的锁 synchronized
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
Node f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { // 加锁
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
Map相关
HashTable
和Vector一样古老的并发容器,其实现了Map接口,方法都是默认加synchronized的,效率低,基本也不用了。
示例用到的锁 synchronized
public synchronized boolean isEmpty() {
return count == 0;
}
HashMap
HashMap的方法默认都是不加锁的,可以通过容器工具类的方法Collections.synchronizedMap(new HashMap
ConcurrentHashMap底层用的分段锁,所以其在效率上会有所提升,主要体现在读上面。由于它往里插的时候内部做了各种各样的判断,本来是链表的,到8之后又变成了红黑树,然后里面又做了各种各样的cas的判断,所以他往里插的数据相比HashTable还要低一点。
ConcurrentSkipListMapConcurrentSkipListMap是线程安全的有序的哈希表,其底层用的是跳表。其特点主要在有序上面,效率相较ConcurrentHashMap略低。
Queue相关 LinkedBlockingDeque双端阻塞队列,实现put阻塞,take阻塞。底层用的是LockSupport.park(),自然而然的实现生产者,消费者模型。
示例用到的锁 ReentrantLock
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node node = new Node(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
ArrayBlockingQueue
底层是数组实现的阻塞队列,实现put阻塞,take阻塞。底层用的是LockSupport.park(),自然而然的实现生产者,消费者模型。
示例用到的锁 ReentrantLock
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
PriorityBlockingQueue
底层用的堆结构,除了阻塞队列的特点外,可以根据比较器,每次取值获取最大值或最小值。
LinkedBlockingQueue底层是单向链表实现的阻塞队列,实现put阻塞,take阻塞。底层用的是LockSupport.park(),自然而然的实现生产者,消费者模型。
SynchronousQueueSynchronousQueue容量为0,它不是用来装内容的,SynchronousQueue是专门用来两个线程之间传内容的。使用中一个线程take阻塞,等待另一个线程put,或者一个线程put阻塞,等待另一个线程take。
用到的锁 CAS自旋锁
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
LinkedTransferQueue
TransferQueue可以给线程来传递任务,以此同时不像是SynchronousQueue只能传递一个,TransferQueue做成列表可以传好多个。它添加了一个方法叫transfer,如果我们用put就相当于一个线程来了往里一装它就走了。transfer就是装完在这等着,阻塞等有人把它取走我这个线程才回去干我自己的事情。
示例用到的锁 LockSupport
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData) // can't match
break;
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
ConcurrentLinkedQueue
并发非阻塞队列,每次offer和poll时会进行cas操作,保证并发的原子性。
DelayQueue本质用的是PriorityQueue,只是让加入的对象实现时间上的排序。
示例用到得锁:ReentrantLock
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
写在最后
本文简单介绍了几类并发容器,给了几个示例代码,算是抛砖引玉吧。



