ConcurrentHashMap 源码解析_01 成员属性、内部类、构造方法解析 1、简介文章参考:小刘讲源码
ConcurrentHashMap 是 HashMap 的线程安全版本,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素。相比于同样线程安全的 HashTable 来说,效率等等各方面都有极大的提升。在学习 ConcurrentHashMap 源码之前,这里默认大家已经读过 HashMap 源码,了解 LongAdder 原子类、红黑树。参考:红黑树学习笔记(自己实现一个简单的红黑树)、HashMap 底层源码细致分析、JDK 集合linkedHashMap源码解析、JDK 8 新特性 LongAdder 源码解析
先简单介绍一下 ConcurrentHashMap 的整体流程:
整体流程跟 HashMap 比较类似,大致是以下几步:
- 如果桶数组未初始化,则初始化;如果待插入的元素所在的桶为空,则尝试把此元素直接插入都桶中的第一个位置;如果正在扩容,则当前线程一起加入到扩容的过程中;如果待插入的元素所在的桶不为空且不存在迁移的元素,则锁住这个桶(分段锁)如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;如果元素存在,则返回旧值如果元素不存在,整个 Map 的元素加 1,并检查是否需要扩容;
添加元素操作中使用的锁主要有(自旋锁 + CAS + synchronized + 分段锁)。
为什么使用 synchronized 而不是 ReentrantLock?
因为 synchronized 已经得到了极大的优化,在特定情况下并不比 ReentrantLock 差。
2、JDK1.8 ConcurrentHashMap 结构图 3、成员属性private static final int MAXIMUM_CAPACITY = 1 << 30; private static final int DEFAULT_CAPACITY = 16; static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private static final int DEFAULT_CONCURRENCY_LEVEL = 16; private static final float LOAD_FACTOR = 0.75f; static final int TREEIFY_THRESHOLD = 8; static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64; private static final int MIN_TRANSFER_STRIDE = 16; private static int RESIZE_STAMP_BITS = 16; private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // 当node节点的hash值为-1的时候,表示当前节点是FWD节点 static final int MOVED = -1; // hash for forwarding nodes // 当node节点的hash值是-2的时候,表示当前节点已经树化了且当前节点为TreeBin对象,TreeBin对象代理操作红黑树 static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations // 0x7fffffff => 0111 1111 1111 1111 1111 1111 1111 1111 可以将一个负数通过位与运算后得到正数,但是不少取绝对值 static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash // 当前系统的CPU数量 static final int NCPU = Runtime.getRuntime().availableProcessors(); // 散列表对象,长度一定是2的次方数 transient volatile Node4、静态属性[] table; // 扩容过程中会将扩容中的新table赋值给nextTable,保持引用,扩容结束之后,这里会被设置为null private transient volatile Node [] nextTable; // LongAdder中的baseCount,未发生竞争的时候或者当前LongAdder处于加锁状态中,增量累加到baseCount中 private transient volatile long baseCount; private transient volatile int sizeCtl; // 扩容过程中,记录当前进度,所有线程都需要从transferIndex中分配区间任务,去执行自己的任务 private transient volatile int transferIndex; // LongAdder 中的cellsBusy 0表示当前LongAdder对象无锁状态,1表示处于加锁状态 private transient volatile int cellsBusy; // LongAdder 中的cells数组,当baseCount发生竞争后,会创建cells数组 // 多线程情况下,线程会通过计算hash值取到自己的cell位置,将增量累加到cell中 // 总个数 = sum(cells) + baseCount private transient volatile CounterCell[] counterCells;
// Unsafe mechanics private static final sun.misc.Unsafe U; // 表示sizeCtl属性在ConcurrentHashMap中的内存偏移地址 private static final long SIZECTL; // 表示transferIndex属性在ConcurrentHashMap中的内存偏移地址 private static final long TRANSFERINDEX; // 表示baseCount属性在ConcurrentHashMap中的内存偏移地址 private static final long baseCOUNT; // 表示cellsbusy属性在ConcurrentHashMap中的内存偏移地址 private static final long CELLSBUSY; // 表示cellsvalue属性在CounterCell中的内存偏移地址 private static final long CELLVALUE; // 表示数组第一个元素的偏移地址 private static final long Abase; // 该属性用于数组寻址,轻继续往下阅读即可了解 private static final int ASHIFT;5、静态代码块
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
baseCOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class> ak = Node[].class;
// 拿到数组第一个元素的偏移地址
Abase = U.arraybaseOffset(ak);
// 表示数组中每一个单元所占用的空间大小,即scale表示Node[]数组中每一个单元所占用的空间
int scale = U.arrayIndexScale(ak);
// (scale & (scale - 1)) != 0:判断scale的数值是否是2的次幂数
// java语言规范中,要求数组中计算出的scale必须为2的次幂数
// 1 0000 % 0 1111 = 0
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// numberOfLeadingZeros(scale) 根据scale,返回当前数值转换为二进制后,从高位到地位开始统计,统计有多少个0连续在一块:eg, 8转换二进制=>1000 则 numberOfLeadingZeros(8)的结果就是28,为什么呢?因为Integer是32位,1000占4位,那么前面就有32-4个0,即连续最长的0的个数为28个
// 4转换二进制=>100 则 numberOfLeadingZeros(8)的结果就是29
// ASHIFT = 31 - Integer.numberOfLeadingZeros(4) = 2 那么ASHIFT的作用是什么呢?其实它有数组寻址的一个作用:
// 拿到下标为5的Node[]数组元素的偏移地址(存储地址):假设此时 根据scale计算得到的ASHIFT = 2
// Abase + (5 << ASHIFT) == Abase + (5 << 2) == Abase + 5 * scale,就得到了下标为5的数组元素的偏移地址
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
6、内部类
6.1、Node 节点
static class Node6.2、ForwardingNode 节点implements Map.Entry { final int hash; final K key; // 保存内存可见性 volatile V val; volatile Node next; Node(int hash, K key, V val, Node next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } // 这里equals是以Entry节点来进行比较访问的 public final boolean equals(Object o) { Object k, v, u; Map.Entry,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } Node find(int h, Object k) { Node e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
这个内部类在之后分析扩容的时候会再仔细进行探究,这里先熟悉一下:
// 如果是一个写的线程(eg:并发扩容线程),则需要为创建新表贡献一份力 // 如果是一个读的线程,则调用该内部类的find(int h, Object k)方法 static final class ForwardingNode6.3、TreeNode 节点extends Node { // nextTable表示新散列表的引用 final Node [] nextTable; ForwardingNode(Node [] tab) { super(MOVED, null, null, null); this.nextTable = tab; } // 到新表上去读数据 Node find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node [] tab = nextTable;;) { Node e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode )e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
TreeBin 中会用到该节点,之后会进行细说:
static final class TreeNode7、构造方法extends Node { // 父节点 TreeNode parent; // red-black tree links // 左子节点 TreeNode left; // 右节点 TreeNode right; // 前驱节点 TreeNode prev; // needed to unlink next upon deletion // 节点有红、黑两种颜色~ boolean red; TreeNode(int hash, K key, V val, Node next, TreeNode parent) { super(hash, key, val, next); this.parent = parent; } Node find(int h, Object k) { return findTreeNode(h, k, null); } final TreeNode findTreeNode(int h, Object k, Class> kc) { if (k != null) { TreeNode p = this; do { int ph, dir; K pk; TreeNode q; TreeNode pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } }
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
构造方法与 HashMap 对比可以发现,没有了 HashMap 中的 thresold 和 loadFactor,而是改用了 sizaCtl 来控制,而且只存储了容量里面,那么它是怎么用的呢?官方给出的解释如下:
-1,表示有线程正在进行初始化操作。-(1 + nThreads),表示有 n 个线程正在一起扩容。0,默认值,后续再真正初始化的时候使用默认容量。> 0,初始化或者扩容完成后下一次的扩容门槛。 8、内部小方法分析
下面在正式分析并发 HashMap 成员方法之前,先分析一些内部类中的子方法函数:
首先看一下 ConcurrentHashMap 内部类 Node 中的 hash 成员属性值的计算方法 spread(int h):
static class Node8.1、spread(int h) 方法implements Map.Entry { final int hash;// 该属性是通过spread(int h)方法计算得到~ h 代表key final K key; volatile V val; volatile Node next; Node(int hash, K key, V val, Node next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } ... }
// 扰动函数
static final int spread(int h) {
// 让原来的hash值异或^原来hash值的右移16位,再&上HASH_BITS(0x7fffffff:二进制位31个1)
return (h ^ (h >>> 16)) & HASH_BITS;
}
下面介绍 tabAt(Node
@SuppressWarnings("unchecked")
static final Node tabAt(Node[] tab, int i) {
// ((long)i << ASHIFT) + Abase的作用:请看下面的分析
return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + Abase);
}
再分析 ((long)i << ASHIFT) + Abase 的时候,先复习一下上面所说的介绍的一些静态属性字段的含义:
// Node 数组的Class 对象
Class> ak = Node[].class;
// U.arraybaseOffset(ak):根据ak 获取NOde[] 数组第一个元素的偏移地址
Abase = U.arraybaseOffset(ak);
// 表示数组单元所占用空间大小,scale表示Node[]数组中每一个单元所占用的空间大小
int scale = U.arrayIndexScale(ak);
// 在java语言中scale必须是二的次方数,这里就是一个小算法判断scale是不是二的次方数
// 如果不是则报错
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// numberOfLeadingZeros(scale):根据scale,返回的当前数值转化位二进制后,这里注意(scale的值一定是2的次幂数),从那个高位到低位开始统计,统计有多少个0连续在一块
// eg:8 -> 1000 则numberOfLeadingZeros(8) 的值就是28,为什么呢?因为Integer是32位,1000占4位,那么前面就有32-4个0,即连续最长的0的个数为28个
// 4-> 100,则numberOfLeadingZeros(4)的值就是29
// ASHIFT = 31 - Integer.numberOfLeadingZeros(4) = 2,那么ASHIFT的作用是什么呢?其实它有一个数组寻址的一个作用:
// 拿到下标为5的Node[] 数组元素的偏移地址(存储地址):假设此时根据scale计算得到的ASHIFT = 2
// ASHIFT主要是用于内存寻址的时使用,假如我们要寻找第6桶位的地址,ABSE + 5 * scale
// 转化为 Abase + (5 << ASHIFT)
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
由上面的几个属性字段的复习介绍,不难得出:
((long)i << ASHIFT) + base 就是得到当前 Node[] 数组下标为 i 的节点对象的偏移地址。然后再通过 (Node 举例子分析一下: 当我们需要 table 容量从 16 扩容到 32的时候,Integer.numberOfLeadingZeros(16) 会得到 27,怎么得来的呢?
numberOfLeadingZeros(n) 根据传入的 n,返回当前数值转换为二进制后,从高位到低位开始统计,统计有多少个0连续再一块:
eg:16 转换二进制 => 1 0000 则 numberOfLeadingZeros(16) 的结果就是 27,因为 Integer 是32位,1 0000 占5位,那么前面就有 32 - 5 个 0,即连续最长的 0 个数为 27 个。 1 << (RESIZE_STAMP_BITS - 1) :其中 RESIZE_STAMP_BITS 是一个固定值 16,与扩容相关,计算扩容的时候会根据该属性值生成一个扩容标识戳。、下面就来计算一下:
至此,ConcurrentHashMap 的源码分析准备工作就完成了,之后会更新其中一些较为麻烦的方法。
static final
8.4、setTabAt(Node
static final
8.5、resizeStamp(int n) 方法
// table数组扩容给的时候,计算出一个扩容标识戳,当需要并发扩容的时候,当前线程必须拿到扩容标识戳才能参与到扩容中去
static final int resizeStamp(int n) {
// RESIZE_STAMP_BITS:固定值 16,与扩容相关,计算扩容的时候会根据该属性值生成一个扩容标识戳
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
// 从 16 扩容到 32
16 -> 32
numberOfLeadingZeros(16) => 1 0000 => 27 => 0000 0000 0001 1011
// 用 B 表示:
(1 << (RESIZE_STAMP_BITS - 1)) => (1 << (16 - 1)) => 1000 0000 0000 0000 => 32768
// A | B
Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1))
-----------------------------------------------------------------
0000 0000 0001 1011 ---> A
1000 0000 0000 0000 ---> B
------------------- ---> | 按位或
1000 0000 0001 1011 ---> 计算得到扩容标识戳
8.6、tableSizeFor(int c) 方法
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
8.7、构造方法
// 无参构造方法
public ConcurrentHashMap() {
}
// 指定初始化容量
public ConcurrentHashMap(int initialCapacity) {
// 判断给定的数组初始长度是否小于0,小于0的话直接抛出异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// sizeCtl>0
// 当size初始化的时候,sizeCtl表示初始化容量
this.sizeCtl = cap;
}
// 根据一个 Map 集合来初始化
public ConcurrentHashMap(Map extends K, ? extends V> m) {
// sizeCtl 设置为默认容量值
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
// 指定初始化容量和负载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
// 指定初始化容量,和负载因子,并发级别
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 当指定的初始化容量initialCapacity 小于并发级别 concurrencyLevel的时候
if (initialCapacity < concurrencyLevel) // Use at least as many bins
// 初始化容量的值设置为并发级别的值
// 即,JDK1.8之后并发级别由散列表长度决定
initialCapacity = concurrencyLevel; // as estimated threads
// 根据初始化容量和负载因子,去计算size
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 根据size重新计算数组初始化容量
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}



