JDK1.7之前的ConcurrentHashMap使用分段锁机制实现类似分散热点的思想,以减少热点域的冲突,最大并发度受Segment的个数限制。
ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可是实现多线程put操作
它维护了一个 segment 数组,每个 segment 对应一把锁
- 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 jdk8 中是类似的
- 缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化
initialCapacity: 初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
loadFactor: 负载因子,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的
concurrencyLevel:并发级别 默认是 16。ConcurrentHashMap 有 16 个 Segments, 最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以改变的。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// ssize 必须是 2^n, 即 2, 4, 8, 16 ... 表示了 segments 数组的大小
int sshift = 0;
int ssize = 1;
// 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// segmentShift 移位数默认是 32 - 4 = 28
this.segmentShift = 32 - sshift;
// segmentMask 掩码默认是 15 即 0000 0000 0000 1111
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// initialCapacity 是设置整个 map 初始的大小,
// 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小
// 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
//插入一个元素不至于扩容,插入第二个的时候才会扩容
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 创建 segments and segments[0]
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, Sbase, s0); //往数组写入 segment[0]
ordered write of segments[0]
this.segments = ss;
}
可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好
- Segment 数组长度为 16,不可以扩容
- Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment
例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位
结果再与 this.segmentMask 做&运算,最终得到 1010 即下标为 10 的 segment
public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 计算出 segment 下标
// hash 是 32 位,无符号右移 segmentShift(28) 位,剩下高 4 位,和 segmentMask(15) 做一次与操作,也就是说 j 是 //hash 值的高 4 位,也就是槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
// 获得 segment 对象, 判断是否为 null, 是则创建该 segment
if ((s = (Segment)UNSAFE.getObject
(segments, (j << SSHIFT) + Sbase)) == null) {
// 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null,
// 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性
s = ensureSegment(j);
}
// 进入 segment 的put 流程
return s.put(key, hash, value, false);
}
segment 继承了可重入锁(ReentrantLock),它的 put 方法为
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 尝试加锁
HashEntry node = tryLock() ? null :
// 如果不成功, 进入 scanAndLockForPut 流程
// 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程
// 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来
scanAndLockForPut(key, hash, value);
// 执行到这里 segment 已经被成功加锁, 可以安全执行
V oldValue;
try {
HashEntry[] tab = table;
//求应该放置的数组下标
int index = (tab.length - 1) & hash;
//first 是数组该位置处的链表的表头
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
if (e != null) {
// 更新
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头
// 1) 之前等待锁时, node 已经被创建, next 指向链表头
if (node != null)
node.setNext(first);
else
// 2) 创建新 node
node = new HashEntry(hash, key, value, first);
int c = count + 1;
// 3) 如果超过了该 segment 的阈值,这个 segment 需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 没有达到阈值,将 node 放到数组 tab 的 index 位置,即将 node 作为链表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
put和remove之间根据ReentrantLock独占锁
put和get之间,put在修改hashEntry元素引用时使用unsafe#putOrderedObject,get使用unsafe#getObjectVolatile,rehash时先创建新数组,最后在一步赋值到table数组,table是通过volatile来保证可见性的
remove和get,如果删除的是头结点,remove通过unsafe来操作数组内部元素;如果不是头结点,它通过设置待删节点的next为前置节点,这里的并发性保证是next是volatile的
ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。
这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。
private SegmentensureSegment(int k) { final Segment [] ss = this.segments; long u = (k << SSHIFT) + Sbase; // raw offset Segment seg; if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { // 这里看到为什么之前要初始化 segment[0] 了, // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k] // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了 Segment proto = ss[0]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 内部的数组 HashEntry [] tab = (HashEntry [])new HashEntry[cap]; if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次检查一遍该槽是否被其他线程初始化了。 Segment s = new Segment (lf, threshold, tab); // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出 while ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
在往某个 segment 中 put 的时候,首先会调用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是说先进行一次 tryLock() 快速获取该 segment 的独占锁,如果失败,那么进入到 scanAndLockForPut 这个方法来获取锁。
private HashEntryrehash扩容流程scanAndLockForPut(K key, int hash, V value) { HashEntry first = entryForHash(this, hash); HashEntry e = first; HashEntry node = null; int retries = -1; // negative while locating node // 循环获取锁 while (!tryLock()) { HashEntry f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node // 进到这里说明数组该位置的链表是空的,没有任何元素 // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置 node = new HashEntry (hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else // 顺着链表往下走 e = e.next; } // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁 // lock() 是阻塞方法,直到获取锁后返回 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头 // 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法 (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全.segment 数组不能扩容,扩容是 segment 数组某个位置内部的数组 HashEntry
private void rehash(HashEntryget 流程node) {//node扩容后需要添加到新的数组中的数据。 HashEntry [] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1;//容量为原来的 2 倍 threshold = (int)(newCapacity * loadFactor); HashEntry [] newTable = (HashEntry []) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) {//将原数组位置 i 处的链表拆分到新数组位置 i 和 i+oldCap 两个位置 HashEntry e = oldTable[i];//链表的第一个元素 if (e != null) { HashEntry next = e.next; //计算应该放置在新数组中的位置, int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry lastRun = e; int lastIdx = idx; // 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用 for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中 for (HashEntry p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry (h, p.key, v, n); } } } } // 扩容完成, 才加入新的节点 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; // 替换为新的 HashEntry table table = newTable; }
get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新表取内容
public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key);
// u 为 segment 对象在数组中的偏移量
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase;
// s 即为 segment,根据 hash 找到对应的 segment
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//找到segment 内部数组相应位置的链表,遍历
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
size 计算流程
- 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回
- 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回
public int size() {
//尝试几次,以获得准确的计数。在因表连续不一致更改而失败时,采用锁定。
final Segment[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // 第一次迭代不是重试
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
// 超过重试次数, 需要创建所有 segment 并加锁
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
并发扩容死链
public class TestDeadlink {
public static void main(String[] args) {
// 测试 java 7 中哪些数字的 hash 结果相等
System.out.println("长度为16时,桶下标为1的key");
for (int i = 0; i < 64; i++) {
if (hash(i) % 16 == 1) {
System.out.println(i);
}
}
System.out.println("长度为32时,桶下标为1的key");
for (int i = 0; i < 64; i++) {
if (hash(i) % 32 == 1) {
System.out.println(i);
}
}
// 1, 35, 16, 50 当大小为16时,它们在一个桶内
final HashMap map = new HashMap();
// 放 12 个元素
map.put(2, null);
map.put(3, null);
map.put(4, null);
map.put(5, null);
map.put(6, null);
map.put(7, null);
map.put(8, null);
map.put(9, null);
map.put(10, null);
map.put(16, null);
map.put(35, null);
map.put(1, null);
System.out.println("扩容前大小[main]:"+map.size());
new Thread() {
@Override
public void run() {
// 放第 13 个元素, 发生扩容
map.put(50, null);
System.out.println("扩容后大小[Thread-0]:"+map.size());
}
}.start();
new Thread() {
@Override
public void run() {
// 放第 13 个元素, 发生扩容
map.put(50, null);
System.out.println("扩容后大小[Thread-1]:"+map.size());
}
}.start();
}
final static int hash(Object k) {
int h = 0;
if (0 != h && k instanceof String) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}
}
究其原因,是因为在多线程环境下使用了非线程安全的 map 集合
JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能
够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)
并发扩容时,同时执行扩容方法,如果原始链表相邻的两个元素,扩容后仍是相邻的两个元素,由于采用了头插入,会造成两个元素形成互为首尾,形成死循环
JDK17 的源码和JDK8 一样没有进行更改
与HashMap类似的数组+链表+红黑树的方式实现,而加锁则采用CAS和synchronized实现
数据结构jdk8 的锁是加在链表的头结点上的
ConcurrentHashMap内部维护了一个Node类型的数组,也就是table:
transient volatile Node[] table;
数组的每一个位置table[i]代表了一个桶,当插入键值对时,会根据键的hash值映射到不同的桶位置,table一共可以包含4种不同类型的桶:Node、TreeBin、ForwardingNode、ReservationNode。
链接到table[i]——桶上的结点就是Node结点。
当出现hash冲突时,Node结点会首先以链表的形式链接到table上,当结点数量超过一定数目时,链表会转化为红黑树
TreeNode是红黑树的结点,TreeNode不会直接链接到table[i]——桶上面,而是由TreeBin链接,TreeBin会指向红黑树的根结点
TreeBin相当于TreeNode的代理结点。 相当于封装了TreeNode的容器,提供针对红黑树的转换操作和锁控制,会在一定时机挂载到table[i]桶上面
TreeBin 继承Node
TreeNoderoot; // 红黑树结构的根结点 volatile TreeNode first; // 链表结构的头结点 volatile Thread waiter; // 最近的一个设置WAITER标识位的线程 volatile int lockState; // 整体的锁状态标识位 static final int WRITER = 1; // 二进制001,红黑树的写锁状态 static final int WAITER = 2; // 二进制010,红黑树的等待获取写锁状态 static final int READER = 4; //二进制100,红黑树的读锁状态,读可以并发,每多一个读线程,lockState都加上一个READER值
而treeNode 也继承与node,说明红黑树的节点既有树的特点又有链表的特点。所以这颗树本身是一个树和链表的结合体,既可以当链表用又可以当树用。这为以后get获取时存在写锁或者等待获取写锁的线程以链表的形式获取数据做支撑。
//默认为 0 //当初始化时, 为 -1,当初始化或扩容完成后,为下一次的扩容的阈值大小 //当扩容时, 为 -(1 + nThread):sizeCtl < 0 && sizeCtl != -1 时,这时候sizeCtl 的高 16 位就表示扩容标识戳,低 16 位表示参与并发扩容线程数+1:1 + nThread, 即当前参与并发扩容的线程数量为 n 个。 private transient volatile int sizeCtl; //整个ConcurrentHashMap 就是一个 Node[] static class Node常量字段implements Map.Entry {} //hash表table transient volatile Node [] table; //扩容时的新hash 表 private transient volatile Node [] nextTable; //基本计数器值,主要在没有竞争时使用,计数将加到该变量上,但也作为表初始化期间的回退。通过CAS更新。 private transient volatile long baseCount; //扩容时需要用到的一个下标变量.散链表数据迁移到新散链表的进度!迁移工作进度是从高位桶开始,一直迁移到下标是 0 的桶位。 private transient volatile int transferIndex //计数数组,出现并发冲突时使用。 //如果使用CAS计算失败,也就是说当前处于高并发的情况下,那么 //就会使用CounterCell[]数组进行计数,类似LongAdder的cells数组分段锁的形式,锁住一个segment //最后size()方法统计出来的大小是baseCount和counterCells数组的总和 private transient volatile CounterCell[] counterCells; //自旋锁标识位,用于CounterCell[]扩容时使用或者creating CounterCells.。类似于LongAdder的cellsBusy变量 //cellsBusy=0表示counterCells不在初始化或者扩容状态下 private transient volatile int cellsBusy; //ForwardingNode结点仅仅在扩容时才会使用。hash值固定为-1,且不存储实际数据。扩容时如果某个bin迁移完毕, 用 //ForwardingNode 作为旧 table bin 的头结点 //如果其他线程发现此位置有ForwardingNode,表示此下标位置已经扩容处理了,不会对此位置进行操作 //如果其他线程在此时get()方法发现此位置是ForwardingNode,则从扩容后的新的链表查找获取 //如果写操作碰见它时,则尝试帮助扩容。 static final class ForwardingNode extends Node {} // 保留结点.hash值固定为-3,不保存实际数据用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node static final class ReservationNode extends Node {} // 作为红黑树的treebin 的头节点, 存储 root 和 first static final class TreeBin extends Node {} // 作为 treebin 的节点, 存储 parent, left, right 的节点 static final class TreeNode extends Node {}
private static final int MAXIMUM_CAPACITY = 1 << 30; private static final int DEFAULT_CAPACITY = 16; static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private static final float LOAD_FACTOR = 0.75f; static final int TREEIFY_THRESHOLD = 8; static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64; private static final int MIN_TRANSFER_STRIDE = 16; private static int RESIZE_STAMP_BITS = 16; private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; static final int MOVED = -1; // 标识ForwardingNode结点(在扩容时才会出现,不存储实际数据) static final int TREEBIN = -2; // 标识红黑树的根结点 static final int RESERVED = -3; // 标识ReservationNode结点() static final int HASH_BITS = 0x7fffffff; // 可用的普通节点哈希值 static final int NCPU = Runtime.getRuntime().availableProcessors();操作node节点的重要方法
// 获取 Node[] 中第 i 个 Node static final构造器Node tabAt(Node [] tab, int i) // cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值 static final boolean casTabAt(Node [] tab, int i, Node c, Node v) // 直接修改 Node[] 中第 i 个 Node 的值, v 为新值 static final void setTabAt(Node [] tab, int i, Node v)
懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, LOAD_FACTOR, 1);
}
}
//初始容量;负载因子,并发度
//concurrencyLevel只是为了兼容JDK1.8以前的版本,并不是实际的并发级别,loadFactor也不是实际的负载因子
//这两个都失去了原有的意义,仅仅对初始容量有一定的控制作用
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // 初始容量小于并发度
initialCapacity = concurrencyLevel; // 改为并发度
//计算下一次扩容时table的大小,并没有创建
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ...
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
get 流程
- 如果table[i]的key和待查找key相同,那直接返回;
- 如果table[i]对应的结点是特殊结点(hash值小于0),则通过find方法查找;
- 如果table[i]对应的结点是普通链表结点,则按链表方式查找。
public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
int h = spread(key.hashCode());// spread 方法能确保返回结果是正数
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头结点已经是要查找的 key
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find的子类方法来查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 普通正常遍历链表, 用 equals 比较
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
//当槽table[i]被普通Node结点占用,说明是链表链接的形式,直接从链表头开始查找:
Node find(int h, Object k) {
Node e = this;
if (k != null) {
do {
K ek;
if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
final Node find(int h, Object k) {
if (k != null) {
for (Node e = first; e != null; ) {
int s;
K ek;
if (((s = lockState) & (WAITER | WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next; // 链表形式
}
// 当读线程在读取数据时,会使用 CAS 的方式将 LOCKSTATE 值 +4(表示加了读锁),读取数据完毕后,再使用CAS 的方式将 LOCKSTATE 值 -4。这说明读锁的叠加只能是4或4的倍数
else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) {
TreeNode r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));//从红黑树的内部节点查找数据
} finally {
Thread w;
// 如果当前线程是最后一个读线程,且有写线程因为读锁而阻塞,则写线程,告诉它可以尝试获取写锁了
if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER | WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
Node find(int h, Object k) {
//循环以避免在forwarding nodes 上的深度递归,可以看到是在新的哈希表上面查找
outer:
for (Node[] tab = nextTable; ; ) {
Node e;
int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (; ; ) {
int eh;
K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode) e).nextTable;
continue outer;
} else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
//在ReservationNode查找:ReservationNode是保留结点,不保存实际数据,所以直接返回null:
Node find(int h, Object k) {
return null;
}
put 流程
以下数组简称(table),链表简称(bin)
public V put(K key, V value) {return putVal(key, value, false);}
//onlyIfAbsent= true 只有第一次才put,若发现map里有则不会用新值覆盖就值
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key value 不能都为null
if (key == null || value == null) throw new NullPointerException();
// 其中 spread 方法会综合高位低位, 具有更好的 hash 性,保证hash码为正整数
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
// f 是链表头节点
// fh 是链表头结点的 hash
// i 是链表在 table 中的下标
Node f; int n, i, fh; K fk; V fv;
// 要创建 table
if (tab == null || (n = tab.length) == 0)
tab = initTable();// 初始化 table
// 如果数组该位置桶位为空,用 CAS 操作将这个新值放入其中即可,如果 CAS 失败,那就是有并发操作
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// cas添加链表头
if (casTabAt(tab, i, null, new Node(hash, key, value)))
break;
}
// 帮忙扩容--判断头结点是否为ForwardingNode,true则其他线程正在扩容:
//则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);// 当前线程锁住当前链表帮忙其他线程扩容;帮忙之后, 进入下一轮循环
else if (onlyIfAbsent // 检查第一个节点是否符合条件是否满足onlyIfAbsent ,是就直接返回不需要替换了
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
// 桶下标冲突,锁住链表头节点
synchronized (f) {
// 再次确认链表头节点没有被移动
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node e = f;; ++binCount) {
K ek;
// 找到相同的 key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
//更新
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
// 已经是最后的节点了, 新增 Node, 追加至链表尾
if ((e = e.next) == null) {
pred.next = new Node(hash, key, value);
break;
}
}
}
//是红黑树节点
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
// putTreeval 会看 key 是否已经在树中, 是, 则返回对应的 TreeNode
if ((p = ((TreeBin)f).putTreeval(hash, key,
value)) != null) {
oldVal = p.val;//保存旧值的引用
if (!onlyIfAbsent)
p.val = value;//新值替换旧值
}
}
//是用在 compute 以及 computeIfAbsent 时, 用来占位的节点此方法不支持
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
// 释放链表头节点的锁
}
if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
if (binCount >= TREEIFY_THRESHOLD)
// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
treeifyBin(tab, i);// 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加 size 计数,每次+1
addCount(1L, binCount);
return null;
}
}
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { //第一次put的时候,table还没被初始化,进入while
if ((sc = sizeCtl) < 0) //sizeCtl初始值为0,当小于0的时候表示在别的线程在初始化表或扩展表
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //cas尝试将 sizeCtl 设置为 -1(表示初始化 table) ,SIZECTL:表示当前对象的内存偏移量,sc表示期望值,-1表示要替换的值,设定为-1表示要初始化表了,这一步会将sizeCtl设置成-1
try {
if ((tab = table) == null || tab.length == 0) {
//指定了大小的时候就创建指定大小的Node数组,否则创建指定大小(16)的Node数组
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n];
table = tab = nt;
sc = n - (n >>> 2);// 其实就是 0.75 * n;下一次扩容的阈值
}
} finally {
sizeCtl = sc; //初始化后,sizeCtl长度为数组长度的3/4
}
break;
}
}
return tab;
}
//当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树
private final void treeifyBin(Node[] tab, int index) {
Node b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY 为 64
// 所以,如果数组长度小于 64 的时候, 会进行数组扩容为一倍
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
// b 是头结点,table的容量 ≥ MIN_TREEIFY_CAPACITY(64)时,进行链表 -> 红黑树的转换
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {//要求b为普通node节点
// 加锁
synchronized (b) {
if (tabAt(tab, index) == b) {
// 下面就是遍历链表,建立一颗红黑树
TreeNode hd = null, tl = null;
for (Node e = b; e != null; e = e.next) {
TreeNode p =
new TreeNode(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 将链表里的节点转成红黑树节点并设置到数组相应桶位中
setTabAt(tab, index, new TreeBin(hd));
}
}
}
}
}
// check 是之前binCount的个数
//借用longadder的分散热点思想设置多个累加单元降低cas锁的冲突
private final void addCount(long x, int check) {
CounterCell[] as;
long b, s;
if (
// 已经有了 counterCells, 首先尝试更新baseCount
(as = counterCells) != null ||
// 还没有, 向 baseCount 累加
!U.compareAndSwapLong(this, baseCOUNT, b = baseCount, s = b + x)
) {
// 更新失败,说明出现并发冲突,则将计数值累加到Cell槽
CounterCell a;
long v;
int m;
boolean uncontended = true;
if (
// 还没有 counterCells 槽数组
as == null || (m = as.length - 1) < 0 ||
// 还没有 cell 根据线程hash值计算槽索引
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// 执行累加单元累加
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x) ) {
// 创建累加单元数组和cell, 槽更新也失败, 则会执行fullAddCount
// 只有 CAS 替换失败时,uncontended 才为 false,否则都为 true
// CAS 替换这里,因为 ThreadLocalRandom#getProbe 的原因,还存在以下情况
// 1. Thread 的 Probe 默认情况下是不会初始化的,所以只有初值 0 ,只有通过 ThreadLocalRandom#current 才会强制初始化
// 也就是说,多线程情况下,如果线程都没有初始化过 Probe,此时如果 CAS 都替换成功,则值都增加到同一个 CounterCell[0] 中,
// 这种情况并不影响并发考虑,因为 CAS 替换的最终也只是一个值,而不是一个队列或数组,不会过分增加负担
// 另外两种情况在方法内被处理
fullAddCount(x, uncontended);
return;
}
if (check <= 1)//链表长度小于等于1,不需要考虑扩容
return;
// 获取元素个数
s = sumCount();
}
if (check >= 0) {// 检测是否扩容
Node[] tab, nt;
int n, sc;
//s标识集合大小,如果集合大小大于或等于扩容阈值(默认值的0.75)
// 并且table不为空并且table的长度小于最大容量
while (s >= (long) (sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);//生成扩容标志戳,当前线程必须拿到扩容标识戳才能参与帮助扩容
if (sc < 0) {//其他线程感知sc已经>> RESIZE_STAMP_SHIFT!=rs 表示比较高RESIZE_STAMP_BITS位生成戳和rs是否相等,相同才能参与扩容
// sc==rs+1 表示扩容结束
// sc==rs+MAX_RESIZERS 表示帮助线程线程已经达到最大值了
// nt=nextTable -> 表示扩容已经结束
// transferIndex<=0 表示所有的transfer迁移任务都被领取完了,没有剩余的hash桶来给自己自己好这个线程来做transfer
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 其他线程知道newtable 已经创建了,帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 当前线程需要扩容,这时 newtable 未创建,cas 将sc改成负数,进入扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();// 重新计数,判断是否需要开启下一轮扩容
}
}
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//获取当前线程的probe的值,如果值为0,则初始化当前线程的probe的值,probe就是随机数
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;// 由于重新生成了probe,未冲突标志位设置为true
}
boolean collide = false; // true 表示最后一个插槽不为空
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//说明counterCells已经被其他线程初始化过了,现在需要创建累加单元
if ((as = counterCells) != null && (n = as.length) > 0) {
//通过该值与当前线程probe求&,获得as槽位的下标元素,和hash 表获取索引是一样的,null说明累加槽没有创建
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { //cellsBusy=0表示counterCells不在初始化或者扩容状态下
CounterCell r = new CounterCell(x); //构造一个CounterCell的值,传入元素个数
//通过cas设置CELLSBUSY标识,表示当前线程已经在初始化槽位CounterCell,防止其他线程来对counterCells并发处理
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
//双重检查如果槽位为null,则将初始化的r对象的元素个数放在对应下标的位置
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {//恢复标志位
cellsBusy = 0;
}
if (created)//创建成功,退出循环
break;
continue; //说明指定rs下标位置的数据不为空,则进行下一次循环
}
}
collide = false;
}
//说明在addCount方法中累加单元cas累加失败了,并且获取probe的值不为空
else if (!wasUncontended)
wasUncontended = true; //设置为未冲突标识,进入下一次自旋
//由于指定下标位置的cell值不为空,则直接通过cas进行原子累加,如果成功,则直接退出
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//如果已经有其他线程建立了新的counterCells或者CounterCells大于CPU核心数(很巧妙,线程的并发数不会超过cpu核心数)
else if (counterCells != as || n >= NCPU)
collide = false; //设置当前线程的循环失败不进行扩容
else if (!collide)//恢复collide状态,标识下次循环会进行扩容
collide = true;
//进入这个步骤,说明CounterCell数组容量不够,线程竞争较大,所以先设置一个标识表示为正在扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {//确保as 和 counterCells 引用一致表示是当前线程在操作
//将counterCells扩容一倍 2变成4,
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;//恢复标识
}
collide = false;
continue; //继续下一次自旋
}
h = ThreadLocalRandom.advanceProbe(h);//更新随机数的值
}
//cellsBusy=0表示没有在做初始化,通过cas更新cellsbusy的值标注当前线程正在做初始化操作
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // 初始化 counterCells
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];//初始化容量为2
rs[h & 1] = new CounterCell(x);//将x也就是元素的个数放在指定的数组下标位置
counterCells = rs;//赋值给counterCells
init = true;//设置初始化完成标识
}
} finally {
cellsBusy = 0;//恢复标识
}
if (init)
break;
}
//竞争激烈,其它线程占据cell 数组,直接累加在base变量中
else if (U.compareAndSwapLong(this, baseCOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
计算索引的方式:i = (n - 1) & hash
n - 1 == table.length - 1,table.length 的大小必须为2的幂次的原因就在这里。
当table.length为2的幂次时除了最高为1其他都为0 ,(table.length-1)的二进制形式的特点是除最高位外全部是1,配合这种索引计算方式可以模拟取余运算实现key在table中的均匀分布,减少hash冲突——出现hash冲突时,结点就需要以链表或红黑树的形式链接到table[i],这样无论是插入还是查找都需要额外的时间
putTreeval向红黑树添加节点,如果存在相同的key值则直接返回,否则进行添加,插入成功打破平衡会修复红黑树
final TreeNodesize 计算流程putTreeval(int h, K k, V v) { Class> kc = null; boolean searched = false; for (TreeNode p = root;;) { int dir, ph; K pk; //初始化根节点 if (p == null) { first = root = new TreeNode (h, k, v, null, null); break; } //是在左分支;比较大小先会通过hash值,key的compareTo方法(如果实现了comparable),没有则的是最后的key的className string和System.identityHashCode(key) else if ((ph = p.hash) > h) dir = -1; //在右分支 else if (ph < h) dir = 1; //插入的key和根节点一致则直接返回,上面的putValue方法会对根节点值进行替换 else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if ((kc == null && //判断是否实现了Comparable接口或者Comparator这种自定义接口 (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) { //没有实现以上两个接口,红黑树不能比较大小,则进行查找看给定的key是否是红黑树的节点,是则返回 if (!searched) { TreeNode q, ch; searched = true; if (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) || ((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null)) return q; } //当哈希码不可比较时,用于插入的中断实用程序;不需要一个总的顺序,只是一个一致的插入规则,以保持对等的再平衡 //其实就是调用key的className string和System.identityHashCode(key):返回的是对象内存中的物理地址产生的hash值, 而不管子类是否重写,和hascode则需要考虑子类是否重新了hashcode方法 dir = tieBreakOrder(k, pk); } TreeNode xp = p; //找到对应的插入位置的父节点;根据dir选择插入位置 if ((p = (dir <= 0) ? p.left : p.right) == null) { TreeNode x, f = first; first = x = new TreeNode (h, k, v, f, xp); if (f != null) f.prev = x; if (dir <= 0) xp.left = x; else xp.right = x; if (!xp.red) x.red = true;//插入节点设置为红色,减少平衡修复 else { lockRoot();//需要平衡修复红黑树时上写锁:这里是由cas LockSupport.park实现 try { root = balanceInsertion(root, x);//平衡红黑树插入修复 } finally { unlockRoot();// 设置lockState = 0; } } break; } } assert checkInvariants(root);//递归检查红黑树的根节点是否改变 return null; } // 锁住TreeBin结点 private final void lockRoot() { //检查 LOCKSTATE 值是否等于 0,如果是 0,则说明没有读线程在检索数据,也没有其他线程持有写锁,这时候可以直接写入数据,无需获取写锁,否则存在读线程或者正在等待获取写锁或者其他线程已经获得写锁了 if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER)) contendedLock(); // offload to separate method } private final void contendedLock() { boolean waiting = false; for (int s;;) { //~WAITER=-3,if 成立则s必然等于2说明等待获取写锁 //最后一个读线程读取完成唤醒了当前写线程,写线程尝试获取写锁 if (((s = lockState) & ~WAITER) == 0) { //cas 获取写锁,cas失败说明其他线程已经获取写锁了,当前线程开始自旋 if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) { if (waiting) waiter = null;//成功了回收waiter return; } } //如果s第二位是0,表示当前没有线程在等待写锁,执行到这里说明存在其他读线程在操作红黑树,当前写线程需要等待读锁的释放 else if ((s & WAITER) == 0) { //将lockState的第二位设置为1,相当于打上了waiter的标记(LOCKSTATE=2),表示该线程在等待写锁 //主意这里的s即lockState是线程内部变量初始为0 ,所以0|2 =2 if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) { waiting = true; waiter = Thread.currentThread(); } } //阻塞当前线程,会在get线程持有读锁的最后一个线程读取完成后唤醒此线程使其继续获取写锁 else if (waiting) LockSupport.park(this); } }
size 计算实际发生在 put,remove 改变集合元素的操作之中
- 没有竞争发生,向 baseCount 累加计数
- 有竞争发生,新建 counterCells,向其中的一个 cell 累加计数
- counterCells 初始有两个 cell
- 如果计数竞争比较激烈,会创建新的 cell 来累加计数
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
// 将 baseCount 计数与所有 cell 计数累加
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
扩容
- 已经有其它线程正在执行扩容了,则当前线程会尝试协助“数据迁移;(多线程并发)
- 没有其它线程正在执行扩容,则当前线程自身发起扩容。(单线程)
sizeCtl在扩容的作用
- 每一个执行扩容任务的线程(包含协助扩容),它在开始工作之前,都会更新 sizeCtl的低 16 位,即让低 16 位 +1,说明又加入一个新的线程去执行扩容。
- 每个执行扩容的线程都会被分配一个迁移工作任务区间,如果当前线程所负责的任务区间迁移工作完成了,没有再被分配迁移任务区间,那么此时当前线程就可以退出协助扩容了,这时候更新 sizeCtl的低 16 位,即让低 16 位 -1,说明有一个线程退出并发扩容了。
- 如果 sizeCtl 低 16 位-1后的值为 1,则说明当前线程是最后一个退出并发扩容的线程。
//在tryPresize方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助扩容。
// 方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n];// 扩容一个长度是n的新数组
table = nt;
sc = n - (n >>> 2); // 相当于0.75 * n
}
} finally {
sizeCtl = sc;
}
}
}
// c <= sc说明已经被扩容过了;n >= MAXIMUM_CAPACITY说明table数组已达到最大容量
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {//进行table扩容
// 根据容量n生成一个随机数,唯一标识本次扩容操作,当前线程必须拿到扩容标识戳才能参与扩容
int rs = resizeStamp(n);
if (sc < 0) {// sc < 0 表明此时有别的线程正在进行扩容
Node[] nt;
// 如果当前线程无法协助进行数据转移, 则退出
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 协助数据转移, 把正在执行transfer任务的线程数加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// sc置为负数, 当前线程自身成为第一个执行transfer(数据转移)的线程
// 这个CAS操作可以保证,仅有一个线程会执行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
final Node[] helpTransfer(Node[] tab, Node f) {
Node[] nextTab; int sc;
//数组不为空且当前节点为FWD节点转移的目标数组不为空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode)f).nextTable) != null) {
//根据数组长度计算一个唯一的数据戳(一个很大的负数)
int rs = resizeStamp(tab.length);
//检查nextTab和table都未发生更改,且sizeCtl<0
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//将sc设置未sc+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//扩容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
//返回:高16位为0,第16位为1,低15位存放当前容量n,用于表示是对n的扩容。保证是在原容量为n的情况下进行扩容。
//RESIZE_STAMP_SHIFT配合可以求出新的sizeCtl的值
//1 sc < 0 已经有线程在扩容,将sizeCtl+1并调用transfer()让当前线程参与扩容。
//2 sc >= 0 表示没有线程在扩容,使用CAS将sizeCtl的值改为(rs << RESIZE_STAMP_SHIFT) + 2)。
//rs即resizeStamp(n),记temp=rs << RESIZE_STAMP_SHIFT。
static final int resizeStamp(int n) {
//Integer.numberOfLeadingZeros(n)用于计算n转换成二进制后前面有几个0;
//(1 << (RESIZE_STAMP_BITS - 1)即是1<<15,表示为二进制即是高16位为0,低16位为1
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
数据迁移
数据迁移,就是把旧table中的各个槽中的结点重新分配到新table中。
数据迁移会将数组分成若干个相同步长的小任务来迁移,这样做的目的是在多线程环境中,每个线程会领取未被领取的小任务(每个调用tranfer的线程会对当前旧table中[transferIndex-stride, transferIndex-1]位置的结点进行迁移),并发处理不同小任务,提高吞吐量,迁移的顺序是从后往前。每个线程都会尽量去做所有任务,通过transferIndex来表示任务分配的进度,对transferIndex的操作是cas的迁移都是将链表或红黑树一分为二,放到i或i+n处,即把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置
-
首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用,
-
每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,在扩容的时候每个线程都有处理的步长,最少为16,在这个步长范围内的数组节点只有自己一个线程来处理
-
每个线程进来的线程去获取自己能处理区域,每个线程通过cas操作设置transferIndex变量,设置成功则线程获得table中[transferIndex-stride, transferIndex-1】这一段的处理权限,先来的线程从后面获取桶段
-
当table数组的大小为2的幂次时,通过key.hash & table.length-1这种方式计算出的索引i,当table扩容后(2倍),新的索引要么在原来的位置i,要么是i+n,这就导致扩容后key对应的索引如果发生了变化,那么其变化后的索引最高位一定是1
- 通过高低位计算不同的索引,保证扩容之后通过(n-1)&hash 还能命中该hash值对应的元素
- 可以保证尽可能的重用尾部的链表,提高扩容的效率。处理的时候lastRun之后的链表是无需遍历和重新构造的
验证高低位 ---------------------------------- 验证数据如下:扩容前长度16->32扩容后长度,Node1(48,“0”,“0”,Node2);Node2(1568,“11”,“11”,null); -------------------------------扩容前-------------------------------------------------------------------- (16-1) 0000000001111 | (16-1) 0000000001111 & | & 48 0000000110000 | 1568 0 0000000000000 | 0 0011000100000 -------------------------------扩容后-------------------------------------------------------------------- 可以看到扩容前,Node1和Node2都在原数组的0号桶中 根据hash&oldcap 得到是高位链表还是低位 48&16=16 ,1568&16=0 ,因此推算出Node1扩容后新的索引为16,Node2扩容后新的索引为0 即:(新数组的长度-1)&Node1.hash=原始下标+原始数组长度=16,(新数组的长度-1)&Node2.hash=原始下标=0 即:31&48=0+16=16,31&1568=0 --------------------------------------------------------------------------------------------------------------- (32-1) 00000000000011111 | (32-1) 00000000000011111 & | & 48 00000000000110000 | 1568 00000011000100000 16 00000000000010000 | 0 00000000000000000 --------------------------------------------------------------------------------------------------------------- 可以看到:(新数组的长度-1)&Node1.hash=原始下标+原始数组长度=16,(新数组的长度-1)&Node2.hash=原始下标=0 确实成立。 ---------------------------------------------------------------------------------------------------------------
- 处理完成一个桶后,会把该桶对应位置设置为执行新链表的ForwardingNode,表示该桶位已经处理过了,当put操作遇到ForwardingNode节点时会帮助转移,然后进入新table操作
- 复制后在新数组中的链表不是绝对的反序的
private final void transfer(Node[] tab, Node [] nextTab) { int n = tab.length, stride; // stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16 // stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,每个线程要负责旧table中的多少个桶 // 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { //首次扩容 try { // 创建新table数组,容量翻倍 Node [] nt = (Node [])new Node,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // 处理内存溢出(OOME)的情况 sizeCtl = Integer.MAX_VALUE;//扩容失败,sizeCtl使用int的最大值 return; } //更新成员变量 nextTable = nextTab; // transferIndex用于控制迁移的位置;[transferIndex-stride, transferIndex-1]表示当前线程要进行数据迁移的桶区间 //从右向左迁移,transferIndex 初始值为n transferIndex = n; } int nextn = nextTab.length;//新的tab的长度 // 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED // 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后, // 就会将位置 i处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了 // 所以它其实相当于是一个标志。 ForwardingNode fwd = new ForwardingNode (nextTab); // 标识一个桶的迁移工作是否完成,advance == true 表示可以进行下一个位置的迁移 如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进 boolean advance = true; boolean finishing = false; // 最后一个数据迁移的线程将该值置为true,并进行本轮扩容的收尾工作 // i 是位置索引,bound 是边界,注意是从后往前 //通过for自循环处理每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值, // 并初始化i和bound值,i指当前处理的槽位序号,bound指需要处理的槽位边界,先处理槽位15的节点; for (int i = 0, bound = 0;;) { Node f; int fh; // 这个循环使用CAS不断尝试为当前线程分配任务,设置迁移的索引区间 // 直到分配成功或任务队列已经被全部分配完毕 // 如果当前线程已经被分配过bucket区域 // 那么会通过--i指向下一个待处理bucket然后退出该循环 // 每一次自旋前的预处理,主要是定位本轮处理的桶区间 // 正常情况下,预处理完成后:i == transferIndex-1,bound == transferIndex-stride while (advance) { int nextIndex, nextBound; //--i表示下一个待处理的bucket,如果它>=bound,表示当前线程已经分配过bucket区域 if (--i >= bound || finishing) advance = false; // 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了,表示所有bucket已经被分配完毕 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //cas来修改TRANSFERINDEX,为当前线程分配任务 ---> 初始范围为[transferIndex-stride, transferIndex-1]=[48,63] else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // nextBound 是这次迁移任务结束的边界索引,注意,是从后往前 bound = nextBound; i = nextIndex - 1;//i表示迁移开始索引 advance = false; } } //i<0说明已经遍历完旧的数组,也就是当前线程已经处理完所有负责的bucket if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 所有的迁移操作已经完成 nextTable = null; // 将新的 nextTab 赋值给 table 属性,完成迁移 table = nextTab; // 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍,代表下一次扩容 sizeCtl = (n << 1) - (n >>> 1); return; } // sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2 // 然后,每有一个线程参与迁移就会将 sizeCtl 加 1, // 这里使用 CAS 操作对 sizeCtl 进行减 1,表示当前线程已完成自己的transfer任务 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 判断当前线程是否是本轮扩容中的最后一个线程,如果不是,则直接退出 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; // 扩容结束了,更新 finising 变量 i = n; // 再次循环一次检查桶位是否完全迁移完毕 } } // 旧桶本身位置为null,不用迁移,直接尝试放一个ForwardingNode,作为占位节点 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 该位置处是一个 ForwardingNode,代表该位置已经迁移过了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node ln, hn; // 头结点的 hash 大于 0,说明是链表的 Node 节点 if (fh >= 0) { int runBit = fh & n; // 由于n是2的幂次,所以runBit要么是0,要么高位是1(结果为n) //lastRun最终要处理的节点:最后保持runBit发生改变的节点,后面的节点runbit都有一样的 Node lastRun = f; for (Node p = f.next; p != null; p = p.next) { // 取于桶中每个节点的 hash 值 //下一个节点的hash&n 当前节点的hash&n,因为一直循环所以就是尾结点和头结点的hash&n比较 int b = p.hash & n; // 如果节点的 hash 值和首节点的 hash 值取于结果不同 if (b != runBit) { runBit = b;//更新runbit为最后反生改变的值,用于下面判断 lastRun 该赋值给 ln 还是 hn。 lastRun = p;// 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环 } } if (runBit == 0) {//如果最后更新的 runBit为0,设置低位节点 ln = lastRun; hn = null; } else {//否则,设置高位节点 hn = lastRun;// 如果最后更新的 runBit高位是 1, 设置高位节点 ln = null; } // 以lastRun所指向的结点为分界,将链表拆成2个子链表ln、hn来构造高位以及低位的链表 ; //分别放到原来的位置和新增加的长度的相同位置(i/n+i) // 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果) // 将原本的一个链表根据hash&n分为2个链表,构建新链表采用头插法 // 无法概括两个新链表相对旧链表的顺序,有很多可能,并不是一个正序,一个倒序 for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 如果与运算结果是 0,那么就还在低位 if ((ph & n) == 0) ln = new Node (ph, pk, pv, ln); else // 1 则创建高位 hn = new Node (ph, pk, pv, hn); } //将低位的链表放在i位置也就是不动 低位链不需要变 setTabAt(nextTab, i, ln); //将高位链表放在i+n位置,n是数组的长度 setTabAt(nextTab, i + n, hn); // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕, // 把旧 table的hash桶中放置转发节点, 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了 setTabAt(tab, i, fwd); // advance 设置为 true,代表该位置已经迁移完毕 advance = true; } //红黑树迁移 else if (f instanceof TreeBin) { TreeBin t = (TreeBin )f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 在复制完树节点之后,判断该节点处构成的树还有几个节点,如果≤6个的话,就转回为一个链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; // 将 ln 放置在新数组的位置 i setTabAt(nextTab, i, ln); // 将 hn 放置在新数组的位置 i+n setTabAt(nextTab, i + n, hn); // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕, // 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了 setTabAt(tab, i, fwd); // advance 设置为 true,代表该位置已经迁移完毕 advance = true; } } } } } }
1 计算stride的值,单核就是表长度,多核是表长度/8/核数,最小16,这个字段的含义是一次处理多少个hash桶。
CPU核数与迁移任务hash桶数量分配(步长)的关系
2.如果nextTab==null,创建nextTab,原长度的2倍
3.构建ForwardingNode节点,其hash就是模式moved,里面包含的就是新表
4.死循环开始转移节点到新表:
先循环找到下一个处理的hash表位置。过程是倒着处理的,先处理数组的后面部分,transferIndex指示下一次处理的下标开始位置。stride是一次处理多少个hash桶。跳过这个查找位置的方法就是所有节点都已经被分配处理了,通过CAS线程抢夺处理权,这里就巧妙的进行了多线程分段处理,各个线程互不干扰。只要其他线程要等所有处理完才结束,那么就不会产生线程安全问题。
如果i<0等条件成立,意味着当前线程没有可以处理的,判断sc的大小,如果符合结束的判断改变并设置finish,在下一个循环完成表的更新。这里可以看出结束判断都是交给sc的变化,这个对所有线程都是一样处理判断的,判断不通过会直接返回,任何线程进入resize的transfer环节都会使得sc+1,所有线程离开都会-1,问题是只有一个线程才会知道所有的都处理完成了,改变table
高低位拆分 单线程下线程的任务分配与迁移 多线程如何分配任务迁移
迁移完成
Java 8 数组(Node) +( 链表 Node | 红黑树 TreeNode ) 以下数组简称(table),链表简称(bin)
- 初始化,使用 cas 来保证并发安全,懒惰初始化 table
- 树化,当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程
会用 synchronized 锁住链表头 - put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素
添加至 bin 的尾部 - get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新
table 进行搜索 - 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可
做,它们会帮助把其它 bin 进行扩容,扩容时平均只有 1/6 的节点会把复制到新 table 中 - size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中。最后统计数量时累加
即可 - 在get操作中,根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的。因为Node的成员val是用volatile修饰的,在多线程环境下线程A修改结点的val或者新增节点的时候是对线程B可见的。当数组在扩容的时候,会对当前操作节点进行判断,如果当前节点还没有被设置成fwd节点,那就可以进行读写操作,如果该节点已经被处理了,那么当前线程也会加入到扩容的操作中去。
- 如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时写线程会帮助扩容,如果扩容没有完成,当前链表的头节点会被锁住,所以写线程会被阻塞,直到扩容完成
- 只有在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容。当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容
当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。
在ConcurrentHashMap中,同步处理主要是通过Synchronized和unsafe两种方式来完成的。
-
在取得sizeCtl、某个位置的Node的时候,使用的都是unsafe的方法,来达到并发安全的目的
-
当需要在某个位置设置节点的时候,则会通过Synchronized的同步机制来锁定该位置的节点。
-
在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED
-
当把某个位置的节点复制到扩张后的table的时候,也通过Synchronized的同步机制来保证现程安全
Unsafe:操作的使用仍然和JDK7中的类似,主要负责井发安全的修改对象的属性或数组某个位置的值。
synchronized主要负责在需要操作某个位置时进行加锁(该位置不为空),比如向某个位置的链表进行插入结点,向某个位置的红黑树插入结点。
JDK8中其实仍然有分段锁的思想,只不过JDK7中段数是可以控制的,而JDK8中是数组的每一个位置都有一把锁
当向 Concurrenthashmap中put一个key, value时
-
首先根据key计算对应的数组下标,如果该位置没有元素,则通过自旋的方法去向该位置赋值。
-
如果该位置是fwd节点则当前线程帮助扩容数据迁移
-
如果该位置有元素,则 synchronized会加锁
-
加锁成功之后,在判断该元素的类型
a.如果是链表节点则进行添加节点到链表中
b.如果是红黑树则添加节点到红黑树
5 添加成功后,判断是否需要进行树化
6 addcount,将 Concurrenthashmap的元素个数加1,但是这个操作也是需要并发安全的,竞争激烈 需要采用分散热点创建累加单元,并 且元素个数加1成功后,会继续 判断是否要进行扩容,如果需要,则会进行扩容
JDK7和JDK8中区别- JDK8 中没有分段锁了,而是使用 synchronized来进行控制
- JDK8 中的扩容性能更高,支持多线程同时扩容,实际上JDK7中也支持多线程扩容,因为JDK7中的扩容是针对每个 Segment的,所以也可能多线程扩容,但是性能没有JDK8高,因为JDK8中对于任意一个线程都可以去帮助扩容
- JDK8 中的元素个数统计的实现也不ー样了,JDK8中增加了 Countefcell来帮助计数,而JDK7中没有,JDK7中是put的时候每个 Segment内部计数,统计的时候是遍历每个 Segment对象加锁统计
- 普通的 HashMap 的负载因子可以修改,但是 ConcurrentHashMap 不可以,因为它的负载因子使用 final关键字修饰,值是固定的 0.75
相同
- 都是数组 +链表+红黑树的数据结构(JDK8之后),所以基本操作的思想一致
- 都实现了Map接口,继承了AbstractMap 操作类,所以方法大都相似,可以相互切换
异同
-
ConcurrentHashMap 是线程安全的,多线程环境下,无需加锁直接使用
-
ConcurrentHashMap 多了转移节点,主要用户保证扩容时的线程安全
-
HashMap的扩容是创建一个新数组,将值直接放入新数组中,JDK7采用头链接法,会出现死循环,JDK8采用尾链接法,不会造成死循环
ConcurrentHashMap 扩容是从数组队尾开始拷贝,拷贝槽点时会锁住槽点,拷贝完成后将槽点设置为转移节点。所以槽点拷贝完成后将新数组赋值给容器



