ConcurrentHashMap的实现原理
ConcurrentHashMap 在 JDK1.7 和 JDK1.8 的实现方式是不同的
JDK1.7下的ConcurrentHashMap
实现原理是数组加链表,jdk1.7下,ConcurrentHashMap由segment 数组结构和 hashEntry 数组结构构成的,每个hashEntry相当于HashMap 中的table数组,即 ConcurrentHashMap 把哈希桶数组切分成小数组(Segment ),每个小数组有 n 个 HashEntry 组成。
每个segment配备一把锁,当一个线程访问其中一段数据时,就可以给这段segment 进行上锁,这样在保证segment锁住的同时而不影响其他线程访问其他的segment,实现了真正的并发访问。
Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:Segment继承了ReentrantLock,因此Segment 是一种可重入锁,Segment 的默认值为16,即并发度为16。
存放元素的HashEntry也是一个静态内部类,其中用volatile 修饰了 HashEntry 的数据 value 和 下一个节点 next,保证了多线程环境下数据获取时的可见性!
JDK1.8下的ConcurrentHashMap
JDK1.8 中的ConcurrentHashMap 选择了与 HashMap 相同的Node数组+链表+红黑树结构;在锁的实现上,抛弃了原有的 Segment 分段锁,采用CAS + synchronized实现更加细粒度的锁。
1.8下,ConcurrentHashMap将锁的级别控制在了更细粒度的哈希桶级别,只要锁住这个桶位置的头节点,就不会影响其他桶数组元素的读写,大大提升了并发度。
JDK1.8 中为什么使用内置锁 synchronized替换 可重入锁 ReentrantLock?
1、在jdk1.6中,对synchronized锁的实现进行了大量的优化,支持多种锁状态,会从无锁->偏向锁->轻量级锁->重量级锁一步步的升级转换。
2、减少内存的开销,假设使用可重入锁来获得同步支持,那么每个节点都需要继承AQS来获得同步支持,但是实际上,只有链表的头节点或者红黑树的根节点才需要同步,带来了巨大的内存浪费。
ConcurrentHashMap的源码解析
JDK1.7下的ConcurrentHashMap的源码
1、初始化:
initialCapacity :初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
比如你初始容量是 64,Segment 的容量为16,那么每个段中哈希表的初始容量就为 64/16=4。
loadFactor:这个负载因子是给 段中哈希表 扩容时候使用的
Segment数组的长度是不可以被改变的,初始化如果不规定,那么就采用默认的 16
从源码中发现,创建ConcurrentHashMap的时候,在Segment第一个位置初始化了一个HashEntry,其他位置则没有,这是因为每次创建一个Segment对象的时候,要进行大量的初始化工作,首先初始化一个s0后,后面只要在涉及到创建Segment对象,只要 拿s0里面的数据当做模板就可以,这样会增加一定的效率。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 非法参数判断。
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 并行级别越界重设。
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments 找到2次幂大小的最佳匹配参数.
int sshift = 0; // 偏移量吧因该是。
int ssize = 1; // segment的数组长度。
// 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
// 如果这里我们的并行级别是16的话。
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 到这一步,sshift = 4, ssize = 16。
// 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
this.segmentShift = 32 - sshift; // 段偏移量
this.segmentMask = ssize - 1; // 段掩码
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// initialCapacity 是设置整个 map 初始的大小,
// 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小。
// 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个。
// 那么此时段中哈希表的容量就为 4。
int c = initialCapacity / ssize; // c 用于
// 如果段中容量与segment的数组长度乘积小于了整个 ConcurrentHashMap 的初始容量,
// 那我们就把目标容量(段中哈希表容量) + 1;
if (c * ssize < initialCapacity)
++c;
// 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
// 插入一个元素不至于扩容,插入第二个的时候才会扩容
int cap = MIN_SEGMENT_TABLE_CAPACITY; //先给此时的段中容量默认为 2 。
// 如果段中默认容量小于目标容量的话就一直扩大到段中默认容量等于目标容量。
while (cap < c)
cap <<= 1;
// 创建段数组的第一个元素 segment[0]
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
// 创建段数组,数组长度就为 ssize 。
Segment[] ss = (Segment[])new Segment[ssize];
// 往段数组中有序的写入 segment[0]。
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
2.put操作,这个方法主要代码就是定位到相应的Segment,然后针对这个Segment进行put操作;
public V put(K key, V value) {
// 先建一个段的临时桶。
Segment s;
// 如果要put的值为空的话就抛出异常。
if (value == null)
throw new NullPointerException();
// 计算 key 的 hash 值
int hash = hash(key);
// 根据 hash 值找到段表中的位置 j。
// hash 是 32 位,无符号右移 segmentShift(28) 位,剩下高 4 位,
// 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的高 4 位,也就是槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
// 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null。
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j); // 初始化 j 处的桶。
// 插入新值到 s 中。
return s.put(key, hash, value, false);
}
首先看一下ensureSegment()这个方法;
在put方法中,首先尝试获取j位置的桶对象,如果这个位置桶对象不为null,那么就直接执行后续的put操作,如果为空,则进入ensureSegment方法
看到这个方法里面,还是不断的通过UNSAFE.getObjectVolatile(ss, u)这个方法去判断对应坐标位置的Segment对象被其他线程创建了,不断的去判断是因为创建Segment对象需要初始化大量的属性,如果在cas创建Segment之前仅做一次判断,那么会浪费内存空间。
确保Segment对象为null时,就通过CAS来创建Segment对象,这个操作是原子性的
最终方法返回seg对象,不管这个seg对象是哪个线程创建的,只要被创建了,就把他返回。
private SegmentensureSegment(int k) { // 临时段表。 final Segment [] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment seg; // 注意,这里是getObjectVolatile这个方法,这个方法的意思就是,别的线程要是修改了segment[k],这个线程是可见的。 if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { // 这里看到为什么之前要初始化 segment[0] 了, // segment[0] 就相当于一个初始化模板, // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k], // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了。 Segment proto = ss[0]; // 获取当前 segment[0] 作为初始化模板。 int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 内部的哈希表。 HashEntry [] tab = (HashEntry [])new HashEntry[cap]; // 再次检查一遍该槽是否被其他线程初始化了。 // 也就是在做上面那些操作时,看看是否有别的线程操作过 segment[k]。 if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { Segment s = new Segment (lf, threshold, tab); // 构建新的 segment 对象。 // 再次检查 segment [k] 是否为null。 // 注意,这里是while,之前的if,也是起到如果下面操作失败,再次检查的作用。 while ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) // CAS操作,这里的比较并交换在CPU里面就是一条指令,保证原子性的。 // 不存在那种比较完毕之后的间隙,突然切换到别的线程来修改这个值的情况。 break; } } } // 返回 segment 对象。 // 这里返回的seg可能是自己new的,也可能是别的线程new的,反正只要其中一个就好了。 return seg; }
得到Segment对象后,就可以对这个Segment对象内部的HashEntry执行put操作了
put操作首先获取锁,如果获取成功,就尝试put了,put的过程和hashmap 其实基本是一样的,如果key相同,就替换值,key不相同,就尝试插入,插入前还要判断是否需要扩容等,最后采用头插法插入,然后解锁。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 再 put 到指定段中之前,我们得获取到当前段表中这个桶的独占的锁。
// 以此来保证整个过程,只有我们一个线程在对这个桶做操作。
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue; // 用来存储被覆盖的值。
try {
// 这个是段表某个桶内部的哈希表。
HashEntry[] tab = table;
// 再次利用待插入键值对 key 的 hash 值,求应该放置的数组下标。
int index = (tab.length - 1) & hash;
// first 是桶中哈希表的待插入桶处的链表的表头。
HashEntry first = entryAt(tab, index);
// 遍历链表。
for (HashEntry e = first;;) {
// 这个遍历主要是为了找出,key重复的情况。
if (e != null) {
K k;
// 如果当前链表节点的key重复了的话。
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k)))
oldValue = e.value;
if (!onlyIfAbsent) {
// 覆盖旧的值。
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
// 如果不为 null。
if (node != null)
// 那就把它设置为链表表头,JDK1.7使用头插法。
node.setNext(first);
else
// 否则如果是null,就先初始化它再设置为链表表头。
node = new HashEntry(hash, key, value, first);
int c = count + 1;
// 如果超过了段表中该桶中哈希表的阈值,这个哈希表就需要扩容。
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 扩容。
else
// 没有达到阈值,将 node 放到哈希表的 index 位置,
// 其实就是将新的节点设置成原链表的表头,使用头插法。
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
return oldValue;
}
scanAndLockForPut()获取写入锁
首先尝试获取锁,如果未获取到,则执行scanAndLockForPut方法,这个方法就是不断的循环获取锁,如果获取失败了,就进行一些准备工作。(语气自旋浪费CPU性能,不如趁着这个功夫去做一些准备工作)
(retries & 1) == 0这个判断代表重试次数为偶数的时候进行后面的判断,为什么会有这个设计呢?我想可能是因为后面的代码执行也需要时间,有可能在执行后续代码的时候,此线程失去了获取锁的机会,造成锁饥饿吧。
重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁,避免cpu空转。
private HashEntryJDK1.8下的ConcurrentHashMap的源码scanAndLockForPut(K key, int hash, V value) { // 简单来说就是拿到段表中某个桶中的哈希表数组中通过hash计算的那个下标下的第一个节点。 HashEntry first = entryForHash(this, hash); // 备份一下当前节点的内容。 HashEntry e = first; // 辅助变量。 HashEntry node = null; int retries = -1; // 用于记录获取锁的次数。 // 循环获取锁。 // 如果获取失败,就会去进行一些准备工作。 while (!tryLock()) { // 辅助变量用于重复检查, // 用来检查对应段表中那个桶上的哈希表数组中对应索引桶处,之前取出来的第一个节点是否还是我们之前取得那个。 HashEntry f; // to recheck first below // 准备工作。 // 因为准备工作也不需要每次循环都去做对吧,最好的预期,做一次准备工作就够了。 if (retries < 0) { // 判断段表中对应桶中的哈希表的对应桶上的节点 HashEntry 是不是还没被初始化。 if (e == null) { if (node == null) // speculatively create node // 进到这里说明数组该位置的链表是空的,没有任何元素。 // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置。 // 将我们即将插进去的元素,构建成一个HashEntry节点对象。 node = new HashEntry (hash, key, value, null); // 将 retries 赋值为0,不让准备工作重复执行。 retries = 0; } // 否则的话,判断 key 是否有重复的。 else if (key.equals(e.key)) // 将 retries 赋值为0,不让准备工作重复执行。 retries = 0; else // 否则顺着链表往下走。 e = e.next; } // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁,避免cpu空转。 // lock() 是阻塞方法,直到获取锁后返回。 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && // 偶数次数才进行后面的判断。 // 这个时候出现问题了,那就是有新的元素进到了链表,成为了新的表头。 // 也可以说是链表的表头被其他线程改变了。 // 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法。 (f = entryForHash(this, hash)) != first) { // 此时怎么做呢, // 别的线程修改了该segment的节点,重新赋值e和first为最初值,和第一二行代码一样的效果。 e = first = f; // re-traverse if entry changed retries = -1; } } // 将准备工作制作好的节点返回。 return node; }
jdk1.8下,ConcurrentHashMap摒弃了segment分段锁的概念,并沿用了同时期hashMap的思想,底层采用“数组+链表+红黑树”的结构,为了做到并发,同时增加了很多辅助类,如TreeBin,Traverser 等内部类
1、ConcurrentHashMap内部重要 的一些属性和内部类
sizeCtl:ConcurrentHashMap中的一个控制符
- -1:代表正在初始化
- -(1+n): 代表有n和线程正在进行扩容操作
- 正数或0 代表hash表还未被初始化,这个数值表示初始化或者下一次进行扩容的大小(类似于扩容阈值,后面可以看到他的值正数时始终是当前容量的0.75倍)
private transient volatile int sizeCtl;
Node 类 :最核心的内部类,包装了键值对,与HashMap中的定义类似,但是他对value和next属性用volatile进行了修饰,包装了可见性
TreeNode类:树节点类,链表过长的时候,转为TreeNode,但是与HashMap不同的是,并不会直接转换为红黑树,认识将节点包装成TreeNode,放在TreeBin对象中,然后由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry
TreeBin类 : 负责包装TreeNode节点,代替了TreeNode的根节点,也就是说ConcurrentHashMap的数组中存放的是TreeBin对象,而不是TreeNode对象。(这个点和HashMap是不一样的,因为在多线程并发下,取的锁是table[i]的第一个元素,如果是存放的是红黑树的话,那么取的就是这个 红黑树的根节点,但是红黑树插入的过程中根节点是可能会改变的,即锁也会发生变化,因此ConcurrentHashMap将TreeBin对象代替红黑树的根节点,这样就不会发生锁对象变化的情况。)
ForwardingNode类:用于连接两个table的节点类,包含一个nextTable指针,用于指向下一张表。这个节点的 key value next 指针全为null,hash值为-1;其中的find方法是从nextTable例查询节点,而不是以自身为头节点进行查找。
2、ConcurrentHashMap内部重要 的一些方法
- tabAt(Node
[] tab, int i) :获取在i位置上的node节点 - casTabAt(Node
[] tab, int i,Node c, Node v):利用CAS算法设置i位置上的Node节点 - setTabAt(Node
[] tab, int i, Node v):利用volatile设置节点位置的值
@SuppressWarnings("unchecked")
static final Node tabAt(Node[] tab, int i) {
return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final boolean casTabAt(Node[] tab, int i,
Node c, Node v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final void setTabAt(Node[] tab, int i, Node v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
3、构造器
1.8里面,调用构造器只是针对传入的值对一些属性进行赋值,并没有构造table数组等,table数组的初始化是在向ConcurrentHashMap中插入元素的时候发生的。
4、putVal 方法:其实和HashMap 是很相似的,只不过增加了一些同步的操作。
大致步骤为:
- 校验key和value的值,这两个值都不能为null,而hashmap中的key和value都是可以为null的
- 死循环并更新tab数组的值,如果容器没有初始化,就调用initTable 方法进行初始化
- 根据计算的hash值找到数组下标,如果对应的位置为空,就创建一个Node对象用CAS方式添加进去,跳出循环
- 如果产生hash冲突,则进行添加操作(链表或者红黑树),这个时候,头节点是被锁住的,保证了同一时间只能有一个线程对这个链表或者红黑树进行修改
- 判断是否需要从链表转为红黑树(树化)
- 判断是否需要扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// 死循环执行
for (Node[] tab = table;;) {
Node f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 初始化
tab = initTable();
// 获取对应下标节点,如果是kong,直接插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS 进行插入
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果 hash 冲突了,且 hash 值为 -1,说明是 ForwardingNode 对象(这是一个占位符对象,保存了扩容后的容器)
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 如果 hash 冲突了,且 hash 值不为 -1
else {
V oldVal = null;
// 同步 f 节点,防止增加链表的时候导致链表成环
synchronized (f) {
// 如果对应的下标位置 的节点没有改变
if (tabAt(tab, i) == f) {
// 并且 f 节点的hash 值 不是大于0
if (fh >= 0) {
// 链表初始长度
binCount = 1;
// 死循环,直到将值添加到链表尾部,并计算链表的长度
for (Node e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
// 如果 f 节点的 hasj 小于0 并且f 是 树类型
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 链表长度大于等于8时,将该节点改成红黑树树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 判断是否需要扩容
addCount(1L, binCount);
return null;
}
5、initTable 方法
初始化table方法主要是应用了属性sizeCtl ,如果sizeCtl <0,标识其他线程挣扎进行初始化,当前线程就放弃此次操作(初始化只能由一个线程操作)。如果获得了初始化的权限,就用CAS的方法将sizeCtl设置为-1,防止其他线程进入再次初始化,当前线程初完成数组的初始化后,将sizeCtl值设置为当前数组长度的0.75.
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 小于0说明被其他线程改了
if ((sc = sizeCtl) < 0)
// 自旋等待
Thread.yield(); // lost initialization race; just spin
// CAS 修改 sizeCtl 的值为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// sc 在初始化的时候用户可能会自定义,如果没有自定义,则是默认的
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 创建数组
Node[] nt = (Node[])new Node,?>[n];
table = tab = nt;
// sizeCtl 计算后作为扩容的阀值
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
6、transfer扩容方法
- 计算CPU核心数和Map数组长度得到每个线程需要处理多少个桶,并且每个线程处理的桶数量都是平均的,默认每个线程处理16个桶,因此如果长度只有16,那么就只有一个线程扩容。
- 初始化临时变量nextTable,在原有的基础上扩容两倍
- 死循环开始转移:多线程并发转移就在这个死循环中,根据finishing变量进行判断,为true表示扩容结束,否则继续扩容
面试被问到 ConcurrentHashMap答不出 ,看这一篇就够了!_Java烂猪皮V的博客-CSDN博客
ConcurrentHashMap源码分析(JDK8版本)_小小旭GISer的博客-CSDN博客_concurrenthashmap源码分析
理解Java7和8里面HashMap+ConcurrentHashMap的扩容策略



