ConcurrentHashMap 源码解析
线程安全的。
ConcurrentHashMap 1.7
数据结构: Segment 数组 + HashEntry 数组 + 链表
···
ConcurrnetHashMap 由很多个 Segment 组合,而每一个 Segment 是一个类似于 HashMap 的结构,所以每一个 HashMap 的内部可以进行扩容。
但是 Segment 的个数一旦初始化就不能改变,默认 Segment 的个数是 16 个,(你也可以认为 ConcurrentHashMap 默认支持最多 16 个线程并发。)
···
1.初始化
/**
*创建一个新的空映射,默认的初始容量(16),
*负载因子(0.75)和并发级别(16)。
*/
public ConcurrentHashMap() {
this(
DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL);
}
无参构造中调用了有参构造,传入了三个参数的默认值,他们的值是。
static final int DEFAULT_INITIAL_CAPACITY = 16; 默认初始化容量
static final float DEFAULT_LOAD_FACTOR = 0.75f; 默认负载因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16; 默认并发级别
以下 是 无参构造方法 调用的 有参构造
···
@SuppressWarnings("unchecked")
public ConcurrentHashMap(
int initialCapacity,
float loadFactor,
int concurrencyLevel) {
// 参数校验
if (!(loadFactor > 0)
|| initialCapacity < 0
|| concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 校验并发级别大小,大于 1<<16,重置为 65536
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 找到2次幂大小的最佳匹配参数
// 2的多少次方
int sshift = 0;
int ssize = 1;
// 这个循环可以找到 concurrencyLevel 之上最近的 2的次方值
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 记录段偏移量
this.segmentShift = 32 - sshift;
// 记录段掩码
this.segmentMask = ssize - 1;
// 设置容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c = 容量 / ssize ,默认 16 / 16 = 1,
这里是计算每个 Segment 中的类似于 HashMap 的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//Segment 中的类似于 HashMap 的容量至少是2或者2的倍数
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 创建 Segment 数组,设置 segments[0]
Segment s0 = new Segment
(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
//分段[0]的有序写入
UNSAFE.putOrderedObject(ss, Sbase, s0);
this.segments = ss;
}
···
总结 1.7 ConcurrnetHashMap 的 初始化原理
1.必要参数 校验。
2.校验 并发级别 concurrencyLevel 大小 , 如果 大于 最大值, 重置 为 最大值
默认 16
3. 寻找 并发级别 concurrencyLevel 之上 的 最近 2的幂次方 值 ,作为初始化值
默认16
4.记录 segmentShift 偏移量
这个值为【容量 = 2 的N次方】中的 N,在后面 Put 时计算位置时会用到。默认是 32 - sshift = 28.
5.记录 segmentMask,默认是 ssize - 1 = 16 -1 = 15.
6.初始化 segments[0]
默认大小为 2,负载因子 0.75,扩容阀值是 2*0.75=1.5**,插入第二个值时才会进行扩容。
** put () **
/**
*将指定的键映射到该表中的指定值。
*键和值都不能为空。
该值可以通过调用get方法来获取
*具有与原始密钥相同的密钥。
* @param key指定值关联的键
* @param value与指定键关联的值
* @return与键关联的上一个值,或者
* null如果没有映射键
* @throws NullPointerException如果指定的键或值为空
*/
public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// hash 值无符号右移 28位(初始化时获得),
然后与 segmentMask=15 做与运算
// 其实也就是把高4位与segmentMask(1111)做与运算
int j = (hash >>> segmentShift) & segmentMask;
/ /非易失性;重新检查
if ((s = (Segment)UNSAFE.getObject
//在 ensureSegment
(segments, (j << SSHIFT) + Sbase)) == null)
// 如果查找到的 Segment 为空,初始化
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
/**
*返回给定索引的段,创建它和
*记录在段表(通过CAS),如果没有已经存在。
* @param k索引
* @return段
*/
@SuppressWarnings("unchecked")
private Segment ensureSegment(int k) {
final Segment[] ss = this.segments;
long u = (k << SSHIFT) + Sbase; // raw offset
Segment seg;
// 判断 u 位置的 Segment 是否为null
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment proto = ss[0]; // use segment 0 as prototype
// 获取0号 segment 里的 HashEntry 初始化长度
int cap = proto.table.length;
// 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的
float lf = proto.loadFactor;
// 计算扩容阀值
int threshold = (int)(cap * lf);
// 创建一个 cap 容量的 HashEntry 数组
HashEntry[] tab = (HashEntry[])new HashEntry[cap];
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
// 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作
Segment s = new Segment(lf, threshold, tab);
// 自旋检查 u 位置的 Segment 是否为null
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) {
// 使用CAS 赋值,只会成功一次
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
总结: ConcurrentHashMap 在 put()时候
1.计算 put()的key 位置 ,获取指定 位置 的 Segment;
2.如果 指定 位置 的 Segment 为空 ,则 初始化 这个 Segment;
初始化 Segment 流程 :
2-1.检查 计算 得到 位置 的 Segment 是否 为 null
2-2.为 null 继续 初始化 , 使用 Segment 的 容量 和 负载因子 创建 一个
HashEntry 数组
2-3.再次 检查 计算 得到 的 指定 位置 Segment 是否 为 null
2-4.使用 创建 的 HashEntry 数组 初始化 这个 Segment
2-5.自旋 判断 计算 得到 指定位置 的 Segment 是否为 null ,
使用 CAS 在 这个位置 赋值 Segment。
3. Segment . put 插入 key value 值
(上面探究了获取 Segment 段和初始化 Segment 段的操作。最后一行的 Segment 的 put 方法还没有查看,继续分析。)
···
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。
HashEntry node =
tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry[] tab = table;
// 计算要put的数据位置
int index = (tab.length - 1) & hash;
// CAS 获取 index 坐标的值
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
if (e != null) {
// 检查是否 key 已经存在,如果存在,
则遍历链表寻找位置,找到后替换 value
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// first 有值没说明 index 位置已经有值了,
有冲突,链表头插法。
if (node != null)
node.setNext(first);
else
node =
new HashEntry
(hash, key, value, first);
int c = count + 1;
// 容量大于扩容阀值,
小于最大容量,进行扩容
if (c > threshold
&& tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// index 位置赋值 node,
node 可能是一个元素,也可能是一个链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
由于 Segemnt 继承了 Reentrank, 所以 Reentrank 内部 可以 很方便 的 获取锁, put 流程 就用到了 这个流程。
1. trylock() 获取锁 获取不到 使用 scanAndLockForPut 方法 继续获取 2. 计算 put 的 数据要放入 的 index位置 , 然后 获取这个位置上的 HashEntry 3.遍历 put新元素 (为什么要遍历? 因为这里获取的 HashEntry 可能是一个空元素, 也可能是链表已存在,所以要区别对待。) ``` 1.如果 这个位置 的 hashentry 不存在 1.1 如果 当前容量 大于 扩容阙值 ,小于 最大容量 进行扩容。 1.2 直接头插法插入。 2. 如果这个位置上的 HashEntry 存在: 2.1 判断链表当前元素 Key 和 hash 值是否和要 put 的 key 和 hash 值一致。 一致则替换值 2.2 不一致,获取链表下一个节点,直到发现相同进行值替换, 或者链表表里完毕没有相同的 2.2.1 如果当前容量大于扩容阀值,小于最大容量,进行扩容。 2.2.2 直接链表头插法插入。 ``` 4.如果要插入的位置 已经存在 , 替换后 返回值 ,否则 返回 null。
这里面的第一步中的 scanAndLockForPut 操作这里没有介绍,
这个方法做的操作就是不断的自旋 tryLock() 获取锁。
当自旋次数大于指定次数时,使用 lock() 阻塞获取锁。
在自旋时顺表获取下 hash 位置的 HashEntry。
private HashEntryscanAndLockForPut(K key, int hash, V value) { HashEntry first = entryForHash(this, hash); HashEntry e = first; HashEntry node = null; int retries = -1; // negative while locating node // 自旋获取锁 while (!tryLock()) { HashEntry f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry (hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { // 自旋达到指定次数后,阻塞等到只到获取到锁 lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
**扩容 rehash **
ConcurrentHashMap 的扩容只会扩容到原来的两倍。
老数组里的数据移动到新的数组时,位置要么不变,
要么变为 index+ oldSize,参数里的 node 会在扩容之后使用链表头插法插入到指定位置。
private void rehash(HashEntrynode) { HashEntry [] oldTable = table; // 老容量 int oldCapacity = oldTable.length; // 新容量,扩大两倍 int newCapacity = oldCapacity << 1; // 新的扩容阀值 threshold = (int)(newCapacity * loadFactor); // 创建新的数组 HashEntry [] newTable = (HashEntry []) new HashEntry[newCapacity]; // 新的掩码,默认2扩容后是4,-1是3,二进制就是11。 int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { // 遍历老数组 HashEntry e = oldTable[i]; if (e != null) { HashEntry next = e.next; // 计算新的位置,新的位置只可能是不便或者是老的位置+老的容量。 int idx = e.hash & sizeMask; if (next == null) // Single node on list // 如果当前位置还不是链表,只是一个元素,直接赋值 newTable[idx] = e; else { // Reuse consecutive sequence at same slot // 如果是链表了 HashEntry lastRun = e; int lastIdx = idx; // 新的位置只可能是不便或者是老的位置+老的容量。 // 遍历结束后,lastRun 后面的元素位置都是相同的 for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // ,lastRun 后面的元素位置都是相同的,直接作为链表赋值到新位置。 newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry p = e; p != lastRun; p = p.next) { // 遍历剩余元素,头插法到指定 k 位置。 V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry (h, p.key, v, n); } } } } // 头插法插入新的节点 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
(有些同学可能会对最后的两个 for 循环有疑惑,这里第一个 for 是为了寻找这样一个节点,这个节点后面的所有 next 节点的新位置都是相同的。
然后把这个作为一个链表赋值到新位置。
第二个 for 循环是为了把剩余的元素通过头插法插入到指定位置链表。
这样实现的原因可能是基于概率统计,有深入研究的同学可以发表下意见。)
···
** get () **
- 计算得到 的 key 值 的 存放位置
- 遍历 指定 位置 查找 相同key 的 value 值。
public V get(Object key) {
//手动集成访问方法以减少开销
Segment s; 55/5000
HashEntry[] tab;
int h = hash(key);
long u = (((h >>> segmentShift)
& segmentMask) << SSHIFT) + Sbase;
// 计算得到 key 的存放位置
if ((s =
(Segment)UNSAFE.getObjectVolatile(segments, u)) != null
&& (tab = s.table) != null) {
for (HashEntry e =
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h))
<< TSHIFT) + Tbase);
e != null; e = e.next) {
// 如果是链表,遍历查找到相同 key 的 value。
K k;
if ((k = e.key) == key
|| (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
···
ConcurrentHashMap 1.8
···
public class ConcurrentHashMapextends AbstractMap implements ConcurrentMap , Serializable {
数据结构: Node 数组 + 链表 / 红黑树
当冲突链表达到一定长度时,链表会转换成红黑树。
···
- 初始化 initTable
private final Node[] initTable() { Node [] tab; int sc; while ((tab = table) == null || tab.length == 0) { //如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。 if ((sc = sizeCtl) < 0) // 让出 CPU 使用权 Thread.yield(); //失去了初始化竞赛;只是自旋 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
ConcurrentHashMap 初始化
通过 initTable()进行 自旋 和 CAS 操作 进行的。
里面需要注意的是变量 sizeCtl ,它的值决定着当前的初始化状态。
(CAS 的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值)
-1 说明正在初始化
-N 说明有N-1个线程正在进行扩容
表示 table 初始化大小,如果 table 没有初始化
表示 table 容量,如果 table 已经初始化。
···
2. put
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value 不能为空
if (key == null || value == null)
throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
// f = 目标位置元素
// fh 后面存放目标位置的元素 hash 值
Node f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 数组桶为空,初始化数组桶(自旋+CAS)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 桶内为空,CAS 放入,不加锁,成功了就直接 break 跳出
if (casTabAt
(tab, i, null,new Node(hash, key, value, null))
)
break; // 添加到空仓时没有锁
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 使用 synchronized 加锁加入节点
synchronized (f) {
if (tabAt(tab, i) == f) {
// 说明是链表
if (fh >= 0) {
binCount = 1;
// 循环加入新的或者覆盖节点
for (Node e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null
&& key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) {
pred.next =
new Node(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 红黑树
Node p;
binCount = 2;
if ((p =
((TreeBin)f).putTreeval
(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
put 总结 :
1.根据 key 计算出 hashcode 。
2.判断是否需要进行初始化。
3.即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,
利用 CAS 尝试写入,失败则自旋保证成功。
4.如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
5.如果都不满足,则利用 synchronized 锁写入数据。
6.如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。
···
3.get
public V get(Object key) {
Node[] tab;
Node e, p; int n, eh; K ek;
// key 所在的 hash 位置
int h = spread(key.hashCode());
if ((tab = table) != null
&& (n = tab.length) > 0
&& (e = tabAt(tab, (n - 1) & h)) != null) {
// 如果指定位置元素存在,头结点hash值相同
if ((eh = e.hash) == h) {
if ((ek = e.key) == key
|| (ek != null && key.equals(ek)))
// key hash 值相等,key值相同,直接返回元素 value
return e.val;
}
else if (eh < 0)
// 头结点hash值小于0,说明正在扩容或者是红黑树,find查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 是链表,遍历查找
if (e.hash == h &&
((ek = e.key) == key
|| (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
总结一下 get 过程:
1.根据 hash 值计算位置。
2.查找到指定位置,如果头节点就是要找的,直接返回它的 value.
3.如果头节点 hash 值小于 0 ,说明正在扩容或者是红黑树,查找之。
4.如果是链表,遍历查找之。
·········································································································
1.7 和 1.8 的 总结
Java7 中 ConcurrentHashMap 使用的分段锁,
也就是每一个 Segment 上同时只有一个线程可以操作,每一个 Segment 都是一个类似 HashMap 数组的结构,它可以扩容,它的冲突会转化为链表。
但是 Segment 的个数一但初始化就不能改变。
Java8 中的 ConcurrentHashMap 使用的 Synchronized 锁加 CAS 的机制。
结构也由 Java7 中的 Segment 数组 + HashEntry 数组 + 链表 进化成了
Node 数组 + 链表 / 红黑树,Node 是类似于一个 HashEntry 的结构。
它的冲突再达到一定大小时会转化成红黑树,
在冲突小于一定数量时又退回链表



