Map是一个Key-Value的结构,在java中是非常常用的数据结构,常用于在内存中存放数据。本篇主要想讨论 ConcurrentHashMap 这样一个并发容器。
ConcurrentHashMapConcurrentHashMap 同 HashMap 一样,底层是基于 数组 + 链表 组成的。在jdk1.6中采用Segment分段锁的方式保证线程安全,在jdk8中则取消了Segment的方式。下面来看下源码进行分析。
构造方法 public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
先看第二个构造方法,只有一个参数为初始化容量。
如果容量大于MAXIMUM_CAPACITY 右移一位 就为 MAXIMUM_CAPACITY ,MAXIMUM_CAPACITY是什么呢,看定义。
private static final int MAXIMUM_CAPACITY = 1 << 30;
否则看 tableSizeFor 方法:
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
先来分析有关n位操作部分:先来假设n的二进制为01xxx…xxx。接着
对n右移1位:001xx…xxx,再位或:011xx…xxx
对n右移2为:00011…xxx,再位或:01111…xxx
此时前面已经有四个1了,再右移4位且位或可得8个1
同理,有8个1,右移8位肯定会让后八位也为1。
综上可得,该算法让最高位的1后面的位全变为1。
最后再让结果n+1,即得到了2的整数次幂的值了。
现在回来看看第一条语句:
int n = cap - 1;
让cap-1再赋值给n的目的是另找到的目标值大于或等于原值。例如二进制1000,十进制数值为8。如果不对它减1而直接操作,将得到答案10000,即16。显然不是结果。减1后二进制为111,再进行操作则会得到原来的数值1000,即8。
tableSizeFor 方法就是得到大于等于c的最小的2^n。
由此可以得出ConcurrentHashMap的最大容量为2^30。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)
这个构造方法增加了负载因子以及并发数,简单来说就是小于并发数时初始化容量为并发数,否则初始化容量为大于等于initialCapacity/loadFactor的最小的2^n。
put方法接下来看put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value不支持null
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
Node f; int n, i, fh;
//如果table是空的则进行初始化 (懒加载)
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//对应位置为空直接cas操作赋值,失败则自旋
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果对应槽位不为空,且他的 hash 值是 -1,说明正在扩容,那么就帮助其扩容。以加快速度
//原因为ForwardingNode的构造方法中将hash设置为-1,且ForwardingNode只有在扩容时有初始化动作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//利用 synchronized 锁写入数据 锁的是hash表的头节点
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node e = f;; ++binCount) {
K ek;
//key的hash值相等并且equals为true 值替换 否则一直往后找知道链表尾部追加
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
//如果onlyIfAbsent为true,有key的hash值相等并且equals为true直接返回,put失败
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
//如果当前就是红黑树就添加到红黑树中并且进行平衡
//TreeBin为hash表的顶级容器,不保存key-value,指向TreeNode根节点
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeval(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//转化为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//会判断是否需要扩容
addCount(1L, binCount);
return null;
}
- 如果hash表为空则进行初始化如果hash表对应位置没有元素直接利用CAS尝试写入,失败自旋如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。如果都不满足,则利用 synchronized 锁hash表对应位置的头节点(相当于锁hash表的每个元素)锁写入数据。如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。
初始化hash表
private final NodeaddCount[] initTable() { Node [] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //让出cpu自旋 //将sc修改为-1 表示正在进行初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; //相当于sc = 0.75 * sc sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
计数+1
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Integer.numberOfLeadingZeros(n) //结果为n的二进制前面有多少个连续的0 假设n=16 ,结果为27
(1 << (RESIZE_STAMP_BITS - 1)) // 为 1 << 15 即高16位为0 低16位第一位为1其余为0
resizeStamp(16) = 32795 即 0000 0000 0000 0000 1000 0000 0001 1011
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, baseCOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果CounterCell[]是null
// 或者CounterCell[]不为null的情况下CounterCell[]的长度小于1也就是等于0,
// 或者CounterCell[]长度不为0的情况下计算当前线程的hash为CounterCell[]的下标,并判断此下标位置是否为空
// 或者CounterCell[]中的某下标位置不为null的情况下通过cas修改CounterCell中的值失败了
// 才调用fullAddCount方法,然后返回
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 如果修改CELLVALUE成功了,则统计ConcurrentHashMap的元素个数
s = sumCount();
}
//下面是扩容逻辑的判断
//if (check >= 0)判断put传入的binCount(也就是check),check为-1的时候代表删除操作。通过check的正负判断是否进行扩容操作。
if (check >= 0) {
//sizeCtl为扩容的阈值。 默认为0,初始化是为-1,初始化完成后置为阈值(sc = n - (n >>> 2);)
Node[] tab, nt; int n, sc;
// 如果元素个数大于等于了阈值或-1就自旋扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//rs = 0000 0000 0000 0000 1000 0000 00000000 | n前面0的个数
int rs = resizeStamp(n);
if (sc < 0) {
// 如果全部元素已经转移完了,或者已经达到了最大并发扩容数限制则breack
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果没有,则sizeCtl加1,然后进行转移元素
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//Integer.numberOfLeadingZeros(n) //结果为n的二进制前面有多少个连续的0 假设n=16 ,结果为27
//(1 << (RESIZE_STAMP_BITS - 1)) // 为 1 << 15 即高16位为0 低16位第一位为1其余为0
//resizeStamp(16) = 32795 即 0000 0000 0000 0000 1000 0000 0001 1011
//rs << RESIZE_STAMP_SHIFT) + 2 结果为1000 0000 0001 1011 0000 0000 0000 0010
//rs低16位代表当前参与扩容的线程数+1 如果最高位为1则为负数 说明已经有线程扩容 在helpTransfer中有处理
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
sumCount
获取总元素个数
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
addCount : 通过属性baseCount 和 counterCell数组中所有的元素的和来记录size
fullAddCount
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 如果counterCells不等于空
if ((as = counterCells) != null && (n = as.length) > 0) {
// h可以理解为当前线程的hashcode,如果对应的counterCells数组下标位置元素当前是空的
// 那么则应该在该位置去生成一个CounterCell对象
if ((a = as[(n - 1) & h]) == null) {
// counterCells如果空闲
// private transient volatile int cellsBusy;
if (cellsBusy == 0) { // Try to attach new Cell
// 生成CounterCell对象
CounterCell r = new CounterCell(x); // Optimistic create
// 再次判断counterCells如果空闲,并且cas成功修改cellsBusy为1
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 如果counterCells对象没有发生变化,那么就将刚刚创建的CounterCell赋值到数组中 并标记created (CounterCell创建成功的标志位) 为 true 退出循环
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
// 如果CounterCell创建成功,则退出循环,方法执行结束
if (created)
break;
// 如果没有创建成功,则继续循环
continue; // Slot is now non-empty
}
}
collide = false;
}
// 如果调用当前方法之前cas失败了,那么先将wasUncontended设置为true,第一次进这个方法为false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 通过cas修改CELLVALUE的值,修改成功则退出循环
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// counterCells发生了改变,或者当前counterCells数组的大小大于等于CPU核心数,设置collide为false,
// 如果到了这个极限,counterCells不会再进行扩容了
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
// 扩容条件
else if (!collide)
collide = true;
// 当collide为true进入这个分支,表示发生了碰撞会进行扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
// 对counterCells进行扩容 << 1 二倍扩容
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 重新进行hash
h = ThreadLocalRandom.advanceProbe(h);
}
// 如果counterCells等于空的情况下会走下面两个分支
// cellsBusy == 0表示counterCells没有线程在用
// 如果counterCells空闲,并且当前线程所获得counterCells对象没有发生变化
// 先通过CAS将cellsBusy标记改为1,如果修改成功则证明可以操作counterCells了,
// 其他线程暂时不能使用counterCells
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
// cellsBusy标记改成后就初始化CounterCell[]
if (counterCells == as) {
// 初始化长度为2
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
// 如果没有初始化成功,则证明counterCells发生了变化,当前线程修改cellsBusy的过程中,
// 可能其他线程已经把counterCells对象替换掉了
// 如果初始化成功,则退出循环,方法执行结束
if (init)
break;
}
//如果都不满足 最后还是cas当前map对象 baseCount + x
else if (U.compareAndSwapLong(this, baseCOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
在无竞争的情况下,通过cas当前map对象的baseCount为baseCount + 1,
有竞争情况下,上面的cas失败,会初始化一个长度为2的CounterCell数组,数组会扩容,每次扩容成两倍,每个线程有在counterCell数组中对应的位置(多个线程可能会对应同一个位置), 如果位置上的CounterCell元素为空,就生成一个value为1的元素,如果不为空,则cas当前CounterCell元素的value为value + 1;如果都失败尝试cas当前map对象的baseCount为baseCount + 1
transfer为什么采用了baseCount + CouterCell 数组计数?
cas方式可以解决多线程并发问题,但是同一时刻只有一个线程能cas成功,而对于引入CounterCell数组,cas的是当前线程对应在数组中特定位置的元素,也就是说如果位置不冲突,n个长度的CounterCell数组是可以支持n个线程同时cas成功的。减少竞争
这个方法是扩容的逻辑,更准确的说应该是转移
private final void transfer(NodehelpTransfer[] tab, Node [] nextTab) { int n = tab.length, stride; // stride表示步长,步长最小为16,如果CPU只有一核,那么步长为n // 既如果只有一个cpu,那么只有一个线程来进行扩容 // 步长代表一个线程负责转移的桶的个数 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { //左移一位即2倍扩容 @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME //异常阈值为Integer最大值 (扩容保护) sizeCtl = Integer.MAX_VALUE; return; } //n为扩容之前数组的长度,stride 主要和CPU相关,含义为步长,每个线程在扩容时拿到的长度,最小为16。 //nextTab为新的table,如果nextTab为空就新建一个table,大小为原来的2倍,代表双倍扩容。 nextTable = nextTab; // 因为是两倍扩容,相当于两个老数组结合成了一个新数组,transferIndex表示第二个小数组的第一个元素的下标 transferIndex = n;//n为扩容前的大小。 } int nextn = nextTab.length; //nextn为扩容后数组的大小 //构建一个ForwardingNode用户并发扩容 fwd是一个标志,标明此节点正在进行迁移。当其他线程进行操作的时候发现这个位置存放的是fwd就知道正在进行扩容。 ForwardingNode fwd = new ForwardingNode (nextTab); // advance为true时,当前桶是否已经迁移完成,如果迁移完成则开始处理下一个桶 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node f; int fh; //每次判断 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing)//检查结束条件 advance = false; //<=0表示所有桶都已迁移完成 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 通过cas来修改TRANSFERINDEX,如果修改成功则对bound和i进行赋值 // 第一循环将进入到这里,来赋值bound和i // nextIndex就是transferIndex,假设为64,假如步长为16,那么就分为4个组,每组16个桶 // 0-15,16-31,32-47,48-63 // nextBound = 64-16=48 // i=64-1=63 // 所以bound表示一个步长里的最小的下标,i表示一个步长里的最大下标 // TRANSFERINDEX是比较重要的,每个线程在进行元素的转移之前需要确定当前线程从哪个位置开始(从后往前) // TRANSFERINDEX每次减掉一个步长,所以当下一个线程准备转移元素时就可以从最新的TRANSFERINDEX开始了 // 移动的元素为bount-i // 如果没有修改成功则继续循环 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // i表示一个步长里的最大下标, 如果i小于0或者大于等于老数组长度,或者下标+老数组长度大于等等新数组长度 if (i < 0 || i >= n || i + n >= nextn) { int sc; //转移完成 if (finishing) { nextTable = null; table = nextTab; //设置阈值 等价于 2*n-0.5n= 1.5n 即新容量的0.75 sizeCtl = (n << 1) - (n >>> 1); return; } // 每个线程负责的转移任务结束后利用cas来对sizeCtl减1 代表当前线程已分配移动任务 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 当前线程负责的任务做完了,同时还有其他线程还在做任务,则回到上层重新申请任务来做 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 当前线程负责的任务做完了,也没有其他线程在做任务了,那么则表示扩容结束了 finishing = advance = true; i = n; // recheck before commit } } // 从i位置开始转移元素 // 如果老数组的i位置元素为null,则表示该位置上的元素已经被转移完成了, // 则通过cas设置为ForwardingNode,表示无需转移 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 如果i位置已经是ForwardingNode,则跳过该位置(就是桶) else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 加锁,开始转移 synchronized (f) { // 加锁完了之后再次检查一遍tab[i]是否发生了变化 if (tabAt(tab, i) == f) { Node ln, hn; // fh大于等于0表示是链表 否则走else 即红黑树 fh为桶位置节点的hash值 红黑树TreeBin节点的hash为-2 // static final int TREEBIN = -2; // hash for roots of trees if (fh >= 0) { // n是老数组的长度 // 因为n是2的幂次方数,所以runbit只有两种结果:0和n int runBit = fh & n; // 遍历链表,lastRun为当前链表上runbit连续相同的一小段的最后一段 Node lastRun = f; for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; //不等于就更新 所以是runbit连续相同的最后一部分 if (b != runBit) { runBit = b; lastRun = p; } } // 如果最后一段的runBit为0,则则该段应该保持在当前位置 // 否则应该设置到i+n位置 // 假设hash值为18 hash表长4 10010 & (11) = 10 // 扩容后 10010 & (111) = 2 // 18 对应的 runBit = 0 // 假设hash值为20 hash表长4 10110 & (11) = 10 // 扩容后 10110 & (111) = 2 = 110 // 20 对应的 runBit = 4 // 20 对应扩容后的位置即为 110 即 原有位置 i + n if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //从头节点开始,遍历链表到lastRun结束 for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 如果ph & n,则将遍历到的节点插入到ln的前面 // 否则将遍历到的节点插入到hn的前面 // ph & n == 0 说明不需要移动 //假设 表长4 hash值&表长==0 说明 hash值从低位开始第3位为0 扩容后 & 2*n -1 低位开始第三位同样是0 即不需要移动 if ((ph & n) == 0) ln = new Node (ph, pk, pv, ln); else hn = new Node (ph, pk, pv, hn); } // 将ln链表赋值在新tab的i位置 setTabAt(nextTab, i, ln); // 将hn链表赋值在新tab的i+n位置 setTabAt(nextTab, i + n, hn); // 这是老tab的i位置ForwardingNode节点,表示转移完成 setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin t = (TreeBin )f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
这个方法是帮助其他线程进行扩容。添加、删除节点之处都会检测到table的第i个桶是ForwardingNode的话会调用helpTransfer()方法。
ForwardingNode只有在扩容时有初始化,即当前正在扩容
//入参是:成员变量 table 和 对应槽位的 f 变量。
final Node[] helpTransfer(Node[] tab, Node f) {
Node[] nextTab; int sc;
// 如果 table 不是空 且 node 节点是转移类型,数据检验 ForwardingNode只有在扩容时有初始化
// 且 node 节点的 nextTable(新 table) 不是空,同样也是数据校验
// 尝试帮助扩容
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
get
public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
// 计算hash
int h = spread(key.hashCode());
// 如果hash表不为空 并且长度>0 并且对应槽位有元素
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 匹配hash值
if ((eh = e.hash) == h) {
// 地址相等 或者 equals 返回true 认为key相同 返回对应的value
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find 方法来查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表, 用 equals 比较
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
sizeCtl 属性在各个阶段的作用
新建而未初始化时
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
sizeCtl 用于记录初始容量大小,仅用于记录集合在实际创建时应该使用的大小的作用 。
初始化过程中
U.compareAndSwapInt(this, SIZECTL, sc, -1)
将 sizeCtl 值设置为 -1 表示集合正在初始化中,其他线程发现该值为 -1 时会让出CPU资源以便初始化操作尽快完成 。
初始化完成后
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n];
table = tab = nt;
//1.5n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
正在扩容时
//第一条扩容线程设置的某个特定基数 U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2) //后续线程加入扩容大军时每次加 1 U.compareAndSwapInt(this, SIZECTL, sc, sc + 1) //线程扩容完毕退出扩容操作时每次减 1 U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)
sizeCtl 用于记录当前扩容的并发线程数情况,此时 sizeCtl 的值为:((rs << RESIZE_STAMP_SHIFT) + 2) + (正在扩容的线程数) ,并且该状态下 sizeCtl < 0 。详看resizeStamp();



