- 前言
- 为什么要用ConcurrentHashMap?
- ConcurrentHashMap简介
- CAS(Compare and Swap)
- 从putVal()方法起阅读源码
- initTable()——ConcurrentHashMap的初始化
- sizeCtl
- casTabAt()修改tab的元素
- tabAt() 获取tab数组某个索引上节点
- 内存不可见性
- putTreeval() 用于把节点添加进红黑树里
- lockRoot()
- unlockRoot()
- treeifyBin() 把单链转化为红黑树
- tryPresize()自动扩容
- resizeStamp()
- TreeBin() 基于双链形成红黑树
- transfer()执行调整转移
- helpTransfer() 协助调整转移
- addCount() 计数和扩容
- fullAddCount() 实现多线程分片计数
- get()
- ForwardingNode的find()方法
- TreeBin的find()方法
- lockState字段解释
- remove()
- replaceNode()
首先,笔者在研究源码时是基于jdk1.8的,而在之前有一种说法,就是之所以使用ConcurrentHashMap是因为HashMap在多线程下使用put方法有可能导致后面会出现死循环状况,但这个已经在jdk1.8中得到解决,简单的说在jdk1.7的HashMap在对链表进行重新散列的时候,在新的数组是使用头插法,而在jdk1.8中使用的是尾插,并且会分为高位链和低位链,大家可以去看我之前写的分析HashMap的博客,我之前的博客详细分析了红黑树的特性,以及实现高低位链的算法的分析,这在本章将不重复提及。HashMap源码详解(一文掌握所有核心知识),强烈推荐大家先去看!
为什么要用ConcurrentHashMap?如果用HashMap的话,在多线程的环境下会存在线程安全问题(如果多个线程只是读取共享资源,不会存在线程安全问题,至少存在一个线程修改共享资源时才会存在线程安全问题)。所以,要解决线程安全问题,也可以用HashTable来替代,但因为HashTable插入时就会用到synchronized,而这个synchronized它锁的颗粒度太大,把整个表都锁住了,在别的线程插入数据的时候另外的线程就不可读取,为了解决这个问题,ConcurrentHashMap应运而生。
ConcurrentHashMap简介在jdk1.7,ConcurrentHashMap是基于分段锁来实现的,而在jdk1.8中,ConcurrentHashMap则抛弃了分段锁的数据结构,取而代之的是CAS(Compare And Swap)+synchronized,相比于前者后者表现为更细粒度,当然,由于粒度的降低,实现的复杂度也增加了。如果不懂并发编程的知识,比如CAS等,不用担心,笔者会在必要的地方进行讲解。
CAS(Compare and Swap)Compare and Swap,顾名思义“比较和交换”,就是在修改一个数之前先比较和预期的数值是不是一致,然后再更新它(类似于乐观锁)。CAS是jdk提供的非阻塞原子性操作,它通过硬件保证了比较——更新操作的原子性(原子性:执行一系列操作时,这些操作要么全部执行,要么全部不执行)。JDK里的Unsafe类提供了一系列的compareAndSwap方法。下面举一个代表方法
public final native boolean compareAndSwapLong(Object obj, long valueOffset, long expect, long update); //各参数的含义 //对象的内存地址 //对象中变量的偏移量 //变量的预期值 //变量新的值 //操作含义:如果对象obj中内存偏移量为valueOffset的变量值为expect,则使用新的值update替换旧的值。
下面是ConcurrentHashMap的Unsafe机制的代码
// Unsafe mechanics
private static final sun.misc.Unsafe U;
//下面这些字段记录的都是某字段在对象中的偏移量,从而方便调用CAS方法来修改某字段的值
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long baseCOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long Abase;
private static final int ASHIFT;
static {
try {
//在静态代码块中,如果类被加载了,就会执行下面操作,去获取特定字段的偏移量。
U = sun.misc.Unsafe.getUnsafe();
Class> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
baseCOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class> ak = Node[].class;
Abase = U.arraybaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
从putVal()方法起阅读源码
public V put(K key, V value) {
return putVal(key, value, false);
}
点进put方法,我们发现是putVal()起作用,在这里笔者先简单介绍一下putVal()的作用。首先,会通过计算得到要插入元素的key的hash值,如果哈希表为空,则会新建一个tab数组,然后通过容量和哈希值进行取余操作,得到对应的桶的下标,如果该桶上为null,则直接通过CAS来把null换成新的插入的节点,如果该桶上有节点,而且该节点hash值为-1,则说明该节点是一个ForwardingNode节点,这说明哈希表正在进行扩容调整(这个后面会详细说),然后该线程就会调用helpTransfer()方法去帮助转移(扩容时调整的操作)。如果不是ForwardingNode节点,那么说明它就是单个节点、链或者是红黑树。这里会先用synchronized来锁住头节点,确保线程安全(在这里我们也可以看出jdk1.8的ConcurrentHashMap的锁的粒度是很细粒度,它细到了一个桶,而在jdk1.7的分段锁,它一个分段锁对应了多个桶)。然后会判断该节点是普通单链表的链头还是红黑树的根节点,如果是单链表的话,就用普通的for循环遍历,如果中途发现了与要插入元素key值一样的节点,那么直接把新值赋给旧值(要看条件来判断是否覆盖原有的值,也可能是保留旧的),这里不用通过CAS操作来转换值了,因为前面的synchronized以及确保了线程安全性。然后如果头节点是红黑树的根节点的话,那么就调用putTreeval()方法把元素插入到红黑树里面,其中会涉及到保持红黑树平衡的balanceInsertion()方法,这都有在我之前写的分析HashMap源码的文章里详细介绍过,这里就不啰嗦。请大家阅读一下putVal()的源码
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//得到哈希值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
Node f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果tab数组为空则先初始化一个
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//利用CAS操作,为数组的某个元素赋值
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//这个代表当前的map正在扩容
//遇到了ForwardingNode节点(MOVED为-1)
else if ((fh = f.hash) == MOVED)
//当发现其它线程正在扩容的时候,它会去帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果对应下标的桶上有元素,先把它锁起来
synchronized (f) {
//确定当前桶上的元素没有被换掉
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node e = f;; ++binCount) {
K ek;
//如果存在的元素f的key值与要插进去的元素的key相同
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
//覆盖掉原来的,或者不覆盖
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
//如果遍历到了链的尾部,则直接插到最尾就行了
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
//如果原本存在的元素f是一棵树
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeval(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果单链的长度大于等于8,就要变为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
initTable()——ConcurrentHashMap的初始化
在上面的putVal()方法我们可以看到,如果初始的tab数组为null时,就要调用initTable()这个方法进行初始话,在介绍这个源码之前,先介绍一个ConcurrentHashMap里一个很重要的属性值——sizeCtl。
sizeCtl这个sizeCtl用于哈希表(就是tab数组)的初始化和调整表大小时的控制。
下面是官方注释的翻译:表初始化和调整大小控制。如果为负,则表正在初始化或调整大小:-1 表示初始化,否则 -(1 + 活动调整大小线程的数量)。否则,当 table 为 null 时,保存创建时使用的初始表大小,或默认为 0。初始化后,保存下一个要调整表格大小的元素计数值。但其是 -(1 + 活动调整大小线程的数量)这个说法是有争议的,也在后来对计算和使用sizeCtl的方法中证实了官方这个说法其实是有误的,我们会在后面遇到时再提及到。
//如果不是在初始化或者调整中,默认为table大小的0.75倍,它表示了需要调整表大小的一个阈值,哈希表元素个数达到了这阈值就要调整表的大小了 //当为负数时(-M),那么M-1就代表了有多少个线程在扩容 //这个M是按照低16位来算的(这个后面会解释) private transient volatile int sizeCtl;
然后这个initTable()会根据这个sizeCtl当前哈希表是否正在初始化或者调整大小,从而来决定当前线程还要不要初始化**(因为有别的线程正在初始化,那么当前线程就没必要再初始化)**。下面是源码
private final NodecasTabAt()修改tab的元素[] initTable() { Node [] tab; int sc; while ((tab = table) == null || tab.length == 0) { //如果小于零,则表示有其它线程正在初始化tab或扩容 //如果大于零,则它表示的是下一次要调整的表格的大小(就是下一次要扩容到多大) //一开始为0,默认还没被初始化 if ((sc = sizeCtl) < 0) //申请让出当前线程的cpu执行时间,并且会在while循环中自旋重试 Thread.yield(); // lost initialization race; just spin //把当前的sizeCtl改为-1,表示当前线程正在进行初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //再次确保还没有被初始化 if ((tab = table) == null || tab.length == 0) { //这里默认使用初始化容量'16' int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //下面新建了一个更大的数组 @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; //n=16,下面sc算出等于12,也就是 0.75*16 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
继续putVal()往下,如果对桶的节点为null,则直接通过casTabAt()来把null换成要插入的节点。这个方法底层通过调用Usafe的方法来完成CAS操作。
static finaltabAt() 获取tab数组某个索引上节点boolean casTabAt(Node [] tab, int i, Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + Abase, c, v); }
//直接从内存中获得数据,防止内存不可见
@SuppressWarnings("unchecked")
static final Node tabAt(Node[] tab, int i) {
//因为table虽然是voliet修饰的,但里面拿到的元素可能也是旧的,为了防止内存不可见的发生,直接用unsafe方法从内存中获取
return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + Abase);
}
内存不可见性
内存不可见性涉及到java的内存模型,像下图
其中L1和L2分别是一级缓存和二级缓存,之所以要搞一个一二级缓存时为了解决寄存器的读写速度比内存快几个数量级的问题。
假如,线程A要获取变量a的值,此时一二级缓存都没有变量a,所以从主内存中读取a,并且在一二级缓存中缓存这个变量,然后线程A把a赋值为1,然后这个修改刷新到一二级缓存和主内存里。然后线程B又读取变量a的值,B的一级缓存里没有a,但二级缓存里有,读到a=1,到此一切都是正常的。然后B重新把a赋值为2,然后重新刷写到一二级缓存和主内存中,但当线程A再次读取变量a的时候,因为在线程A的一级缓存中有变量a,而这个变量a的值是1,所以线程A拿到a的值为1而不是2,所以问题就出现了,线程B改的值对于线程A来说是不可见的。
所以在Unsafe类里有一些方法专门会去读取主内存而不是缓存的值,避免了内存不可见问题。在ConcurrentHashMap里,虽然table是用volatile修饰的,解决了内存不可见问题,但table里的元素却没有解决这个问题,所以还需要通过CAS方法之间访问主内存。
虽然它的逻辑和之前的HashMap大致相同,但因为ConcurrentHashMap做到了在写的时候也能读,所有这个方法相比HashMap来说也是有区别的,下面时源码
final TreeNodelockRoot()putTreeval(int h, K k, V v) { Class> kc = null; boolean searched = false; for (TreeNode p = root;;) { int dir, ph; K pk; if (p == null) { first = root = new TreeNode (h, k, v, null, null); break; } else if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; //如果在树里找到key相等的节点,就返回该节点 else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; //看看节点有没有实现Comparable接口,有点话用其比较大小 else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) { if (!searched) { TreeNode q, ch; searched = true; if (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) || ((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null)) return q; } //如果通过上诉方法都没有办法判断要插入元素的hash和当前节点的hash的大小,则用下面的方法打破僵局 dir = tieBreakOrder(k, pk); } TreeNode xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { TreeNode x, f = first; first = x = new TreeNode (h, k, v, f, xp); if (f != null) f.prev = x; if (dir <= 0) xp.left = x; else xp.right = x; if (!xp.red) x.red = true; else { //设置锁标记 //这个时关键点,因为balanceInsertion()方法可能为了保持平衡而改变红黑树的结构 //如果在结构改变的时候有其它线程进行读取的话,是可能发生异常的 lockRoot(); try { //在插入后保持对树的平衡 root = balanceInsertion(root, x); } finally { unlockRoot(); } } break; } } assert checkInvariants(root); return null; }
//改变锁的状态,标记为'写'状态
private final void lockRoot() {
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
//如果CAS写锁失败,则继续去竞争
contendedLock(); // offload to separate method
}
unlockRoot()
//把'写'状态给取消掉
private final void unlockRoot() {
lockState = 0;
}
treeifyBin() 把单链转化为红黑树
然后在putVal()方法继续往下,碰到hash为-1的ForwardingNode节点对应的处理后面再说,接下来是对单链和树的处理逻辑上与HashMap没有区别,最后如果单链的长度大于等于8,就要把单链转化为红黑树。
这个方法和HashMap中单链转红黑树的方法逻辑大概一样,都是先把单链遍历一次,把它们分开为两条低位和高位的链,然后调用TreeBin的构造方法,在双链的基础上形成红黑树,与HashMap的区别是多了一个用synchronized上锁的效果,下面是它的源码
private final void treeifyBin(NodetryPresize()自动扩容[] tab, int index) { Node b; int n, sc; if (tab != null) { //如果容量小于64,则还不能建树,先扩容 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //新的容量为两倍旧容量 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { //再次确定index位置的节点还是不是原来的b节点 if (tabAt(tab, index) == b) { TreeNode hd = null, tl = null; //下面把单链进化成由TreeNode节点组成的双链 for (Node e = b; e != null; e = e.next) { TreeNode p = new TreeNode (e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //往index的桶里放入TreeBin节点 //所以说,TreeBin节点是在单链变树时就放入的 setTabAt(tab, index, new TreeBin (hd)); } } } } }
在建立红黑树的时候,如果tab的容量小于64的话,是要先扩容的,而这个tryPresize()里会涉及很多位运算,不过不用慌,笔者会一一解释,下面先放源码
private final void tryPresize(int size) {
//如果大于2的29次幂,则c被复以MAXIMUM_CAPACITY(2的30次方)
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
//下面这里返回一个二次幂的数给c,为了确保容量是2的幂次
tableSizeFor(size + (size >>> 1) + 1);
int sc;
//sizeCtl大于等于0,表示tab要调整大小的阈值
//并且也表明当前没有线程在扩容或者初始化
while ((sc = sizeCtl) >= 0) {
Node[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
//设置初始化标志,设为-1表示当前线程正在初始化
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n];
table = nt;
//下面这里相当于 n*0.75
//它表示的是下一个要调整表格大小的时候要达到的阈值(容量*负载因子)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
//如果还没有达到扩容的条件(c <= sc),或者tab的容量已经超过最大的容量了
//这个时候就不能扩容,break退出
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//确保当前的table还是原来的
else if (tab == table) {
//返回用于调整大小为 n 的表的标记位(这个方法下面有解释)
int rs = resizeStamp(n);
//小于零表示当前有线程正在初始化或调整表的大小
//这里说明已经有别的线程在进行扩容
if (sc < 0) {
Node[] nt;
//可以先看下面的代码,再回过头看这里
//如果sc右移动16位得到的就是它之前resizeStamp出来的值,如果此时它不等于rs,则说明标识符变了
//或者扩容就结束了,因为第一个扩容线程会把rs左移16位然后加上2,而现在sc == rs + 1,扩容结束
//或者扩容线程超过了最大调整线程数
//或者转移下标小于等于0(扩容结束)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//rs左移了16位,本来int有32个位,rs第16个位已经是1了,左移16位的话就变成了负数
//左移后,前16位就包含了最高位的1加上n前面的零的个数(看下面对resizeStamp的解释)
//所以sizeCtl被设为了负数,这是合理的,因为sizeCtl为负数时代表有线程正在调整或初始化
//因为在这里要表示的是扩容,而不是初始化,所以设为-1是不合理的,要设为-(M+1),M代表正在扩容调整的线程数,因为"rs << RESIZE_STAMP_SHIFT) + 2",所以M就为1,表示正在调整扩容的线程数为1
//而按照官方对sizeCtl的解释中,rs左移16其实是一个很小的负数,如果按照-(M+1),那么久会被认为有很多正在调整的线程数,这是不合理的,所以-(M+1)只针对于sizeCtl的低16位,而不考虑高位,从而可以解释了rs << RESIZE_STAMP_SHIFT) + 2的由来,代表了正在扩容调整的线程数为1
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
resizeStamp()
这个方法把一个数字1左移了15位,也就是说32位的int它的第16位是一个1,然后在与一个n的最高位到第一个非零位之间的零的个数进行与运算,因为0的个数很少,不可能去到16位(二进制)那么多,所以下面这个方法可以看成左右两边的数相加。
static final int resizeStamp(int n) {
//得到前面有多少个0
//这个相当于相加操作
//右边把1左移15位
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
TreeBin() 基于双链形成红黑树
这是一个构造方法,它会给桶的头节点返回一个TreeBin节点,所以在ConcurrentMap中,红黑树的头节点是TreeBin
TreeBin(TreeNodetransfer()执行调整转移b) { super(TREEBIN, null, null, null); this.first = b; TreeNode r = null; //遍历TreeNode链表并形成一棵红黑树 for (TreeNode x = b, next; x != null; x = next) { next = (TreeNode )x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class> kc = null; for (TreeNode p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; //通过这个方法来保持红黑树的平衡性 r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); }
这个方法被设计得很精妙,这个方法可以让多个线程共同协作,在tab扩容的时候将元素从旧数组里转移到新的数组里,从而发挥了多核cpu的性能,简单的说就是把旧的tab数组划分为几块,然后每个线程负责一块,然后把它们自己负责那块的元素重新转移到新的数组里,下面看源码。
/将节点移动到新的table中 private final void transfer(NodehelpTransfer() 协助调整转移[] tab, Node [] nextTab) { int n = tab.length, stride; //这里stride代表了每个线程能够转换的桶的数量 //如果是单核cpu,线程不能并行执行,stride会被设为tab容量,也就是桶的数量,所以所有的转换操作由单个线程执行 //如果是多核cpu,这时候就可以给多个线程分配不同的桶区域各自并行执行自己的转换工作,但涉及内存争用问题,每个线程至少管16个桶(MIN_TRANSFER_STRIDE)的转移工作 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //如果传入的新table为空,则初始化 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n << 1]; //新table容量为旧的两倍 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //把转换索引设为tab容量,这个字段很重要,索引值小于它的桶,都属于为未分配给线程来执行转移的桶 //所以下面会把索引小于transferIndex的桶分配给一个或多个线程来让它们自己执行转移节点操作 //而每分配了一定范围的桶后,transferIndex会减小,其减小的量就等于其分配给线程管理的桶的数目。 transferIndex = n; } int nextn = nextTab.length; //ForwardingNode的hash值被赋为MOVED,即-1 //先new一个这个对象,因为后面会用到 ForwardingNode fwd = new ForwardingNode (nextTab); boolean advance = true; boolean finishing = false; //来标记是不是所有的节点都完成了转移 for (int i = 0, bound = 0;;) { Node f; int fh; //线程每次处理完一个节点,advance会被重新设为true,然后重走该循环 //这个while循环是分配不同线程管理不同范围的桶的关键 while (advance) { //循环中分配每个线程要参与转移的范围 int nextIndex, nextBound; //如果该线程的i以及小于了它管理的桶的范围内的最低索引值,则说明它的任务完成了 if (--i >= bound || finishing) advance = false; //同样,如果transferIndex字段值已经小于等于0了,说明所有的桶都已经分配给了其它线程负责转移了,已经没有当前线程什么事了,所以当前线程不参与协助转移 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //通过CAS更新transferIndex的值 //通过下面的操作为当前线程分配它要管理的桶(它要负责把桶里的元素转换到nextTab中) else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound;//当前线程可以处理的最小当前区间下标 i = nextIndex - 1;//当前线程可以处理的当前区间的最大下标 advance = false; } } //判断整个扩容是否已经完成 //以及判断当前线程扩容任务是否完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; //如果所有节点都完成了转移,那么直接把table设置为新的table(nextTab) //并且把原有的nextTable清空 if (finishing) { nextTable = null; table = nextTab; //相当于新的table容量乘以0.75( 2*n*0.75 ) sizeCtl = (n << 1) - (n >>> 1); return; } //整个扩容未完成,但当前线程完成了,所有把正在扩容的线程数减1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //若sc的值已更新,证明有别的线程完成了整个扩容任务 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //如果旧的tab的i上的节点为空时,则通过CAS在那里放置ForwardingNode节点 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { //防止在还没有上锁前,别的线程把tab[i]上的节点给换掉了 if (tabAt(tab, i) == f) { Node ln, hn; //这里执行对链表的转移 if (fh >= 0) { int runBit = fh & n; Node lastRun = f; //遍历一次链表 for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //形成一条低位链和高位链 for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node (ph, pk, pv, ln); else hn = new Node (ph, pk, pv, hn); } //把高低位链放到新的tab中 //并且把旧tab中旧链存在的地方换为ForwardingNode节点 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } //下面对红黑树转移 else if (f instanceof TreeBin) { TreeBin t = (TreeBin )f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; //红黑树的节点之间形成了一条双链表 //同样的把该双链表分为低位链和高位链 for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果链的长度小于等于6,则不满足建树的条件,执行untreeify把双链表退化成单链表 //否则把高低位的双链重新构成两颗红黑树,然后分别把它们的根节点放到新的tab对应的下标上 //untreeify逻辑与HashMap相同,可以看看我上一篇博客写的HashMap源码分析 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
回过头来看putVal()方法,我现在知道了,如果要插入的桶里有一个hash=-1的ForwardingNode节点,则说明有其它线程在调整转移旧tab的节点到新的tab,所以当前线程会调用helpTransfer()方法来协助其它线程一起调整。我们发现它还是最终调用了transfer(tab, nextTab)方法参与扩容。
final NodeaddCount() 计数和扩容[] helpTransfer(Node [] tab, Node f) { Node [] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode )f).nextTable) != null) { int rs = resizeStamp(tab.length); //假如正在扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //如果sc右移动16位得到的就是它之前resizeStamp出来的值,如果此时它不等于rs,则说明标识符变了 //或者扩容就结束了,因为第一个扩容线程会把rs左移16位然后加上2,而现在sc == rs + 1,扩容结束了 //或者扩容线程超过了最大调整线程数 //或者转移下标小于等于0(扩容结束) if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //增加一个调整tab的线程 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
在putVal()方法的最后,有一个addCount()方法,它和treeifyBin()方法都可以触发扩容。这个addCount()主要就是计算tab上使用的节点的数目,然后看是否达到了扩容的阈值(sizeCtl),如果达到了,就尝试进行扩容。为什么要用一个counterCells而不单单用一个baseCount来计算数值呢,这是因为在多线程环境下,用baseCount进行累加赋值存在线程安全问题,**而如果通过锁或者CAS的方式的话,如果有多个线程并且添加元素频繁的话,会造成过多的阻塞或者过多的自旋重试。**而counterCells是一个数组,每个线程对应其中一个桶(分片),通过分片来记录数值,最后只要遍历它们并把总数加起来就可以。其实一开始是会对baseCount进行CAS累加,但如果发生了失败(发生了线程竞争),那么就会对counterCells数组进行初始化,的下面是它的源码。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//如果counterCells不等于空
//或者对baseCount的CAS失败了
if ((as = counterCells) != null ||
//这里会尝试对baseCount进行CAS累加
!U.compareAndSwapLong(this, baseCOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//如果counterCells为空
//或者counterCells长度-1小于零
//或者counterCells指定的索引上的元素为空
//或者对counterCells指定索引元素的CAS操作失败
if (as == null || (m = as.length - 1) < 0 ||
//这里获得的是线程的哈希探针,通过这种方法将线程与counterCells的某个桶联系起来
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//把counterCells里面的数和baseCount的值加起来求总数
s = sumCount();
}
if (check >= 0) {
Node[] tab, nt; int n, sc;
//如果tab元素个数大于了调整容量,并且当前tab不等于null且长度小于最大容量,则进行扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//有线程正在扩容
if (sc < 0) {
//如果sc右移动16位得到的就是它之前resizeStamp出来的值,如果此时它不等于rs,则说明标识符变了
//或者扩容就结束了,因为第一个扩容线程会把rs左移16位然后加上2,而现在sc == rs + 1,扩容结束
//或者扩容线程超过了最大调整线程数
//或者转移下标小于等于0(扩容结束)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
fullAddCount() 实现多线程分片计数
这个方法主要实现了counterCells的初始化及其扩容和将counterCells里的桶与对应线程关联起来,主要通过线程的探针哈希,下面是它的源码。
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//获取当前线程的探针哈希值,如果为0,则进行初始化
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
//因为重新设置了探针哈希值,所以未冲突标记设置为true
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//自旋重试
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
//构建一个CounterCell对象,封装了x的值
CounterCell r = new CounterCell(x); // Optimistic create
//设置cellsBusy值,保证只有一个线程能操作
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//在这里把r对象赋到数组对应的下标上
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//如果竞争失败了,重新设置为成功,这样可以在rehash后继续重试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//如果指定下标的值不为空,则直接在原来的基础上加计数量
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//如果有别的线程进行了扩容,并且容量大于了cpu核心数
else if (counterCells != as || n >= NCPU)
//把当前counterCells标记为旧的,而且不执行下面扩容
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
//在这里实现了对counterCells的扩容
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//更改当前线程的哈希探针,达到rehash的效果,从而避免与其它线程在counterCells数组中发生冲突
h = ThreadLocalRandom.advanceProbe(h);
}
//对counterCells进行初始化,把CELLSBUSY的标志从0变为1
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, baseCOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
get()
看完了put方法,下面来看看get方法,研究一下它是如果做到在某个桶在写的时候,也能读,下面是源码
public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
//如果元素的key值与要找的相同,则直接返回该元素
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
//eh=-1,说明该节点是一个ForwardingNode,正在迁移,此时调用ForwardingNode的find方法去nextTable里找。
//eh=-2,说明该节点是一个TreeBin,此时调用TreeBin的find方法遍历红黑树,由于红黑树有可能正在旋转变色,所以find里会有读写锁。
//eh>=0,说明该节点下挂的是一个链表,直接遍历该链表即可。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//通过遍历链表查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
里面的e.find()方法是有子类去重写的
ForwardingNode的find()方法NodeTreeBin的find()方法find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node [] tab = nextTable;;) { Node e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; //如果找到key值一样的节点,则返回该值 if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode )e).nextTable; continue outer; } else //对TreeBin节点进行(红黑树)查找 return e.find(h, k); } //如果e.next==null,说明链表到尾了,没找到元素。 if ((e = e.next) == null) return null; } } }
通过查看这个方法,我们就可以得知为什么在桶一个桶可以同时进行读和写的操作
final NodelockState字段解释find(int h, Object k) { if (k != null) { for (Node e = first; e != null; ) { int s; K ek; //锁被标记为写锁或等锁,在其它线程占用的情况下(有可能其它线程正在改变树的结构),继续保持线性搜索 //因为红黑树节点之间维护了双链表 //看不懂可以看下面,有位运算解释 if (((s = lockState) & (WAITER|WRITER)) != 0) { if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; e = e.next; } //将锁标志为读锁 else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) { TreeNode r, p; try { p = ((r = root) == null ? null : //通过二叉树的特性进行查找 r.findTreeNode(h, k, null)); } finally { Thread w; //getAndAddInt获得的是旧的值 //如果当前的既有读锁也有等待写锁,并且等待的线程不为空 if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null) //唤醒该线程 LockSupport.unpark(w); } return p; } } } return null; }
这个字段存在于TreeBin中
static final class TreeBinextends Node { ...... volatile int lockState; // values for lockState static final int WRITER = 1; // 保持写锁时设置 static final int WAITER = 2; // 等待写锁时设置 static final int READER = 4; // 设置读锁的增量值 ...... }
我们发现上面几个标识是有规律的,它们的二进制位分别是
00000001 00000010 00000100
我们发现它们是相互错开的,所以这很方便位运算,例如WAITER|WRITER得出
00000011
如果 (s = lockState) & (WAITER|WRITER)) != 0
那么后lockState两个低位至少有一个为1,那么翻译过来就是lockState标记为了写锁或者等待锁或者两个都有
remove()下面我们来了解一下移除元素的方法
public V remove(Object key) {
return replaceNode(key, null, null);
}
replaceNode()
这个才是执行真正移除的方法,其实大部分逻辑是和putVal()方法相似的,然后其中调用的很多方法也是和之前类似的,就不做过多解释了
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node[] tab = table;;) {
Node f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
//协助转移
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
//对要操作的桶加锁
//确保不会有另一个线程执行写或移除的操作
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
//这里对链进行操作
for (Node e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
//删除单个节点
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
//对红黑树进行删除操作
else if (f instanceof TreeBin) {
validated = true;
TreeBin t = (TreeBin)f;
TreeNode r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}



