- JDK1.7
- JDK1.8
- 总结
- 参考资料
为什么使用 ConCurrentHashMap?
因为 HashMap 在多线程的情况下不安全。
HashTable 虽然是线程安全的,但是几乎在每个方法上加了 synchronized 锁,也就是在涉及到多线程操作中,会锁住整个数组table,只能由一个线程对 HashTable 进行操作,而其他的线程则会堵塞等待,从而导致性能较为低下。
ConCurrentHashMap 采用锁分段技术,操作更为细粒,提高了并发的效率。
在JDK1.7 的版本中,ConcurrentHashMap 的数据结构是通过 Segment数组 + HashEntry数组 + 链表来实现的。
锁分段技术是指,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
ConcurrentHashMap 的数据插入需要进行两次 Hash 去定位数据的存储位置
put 操作
从 Segment 的继承体系可以看出,Segment 实现了 ReentrantLock,也就带有锁的功能。
- 当执行 put 操作时,会进行第一次 key 的 hash 来定位 Segment 的位置;
a. 如果该 Segment 还没有初始化,即通过 CAS 操作进行赋值; - 然后进行第二次 hash 操作,找到相应的 HashEntry 的位置,这里会利用继承ReentrantLock 的 tryLock() 方法去尝试获取锁;
- 如果当前 Segment 中 HashEntry 结点个数超过阈值,就 rehash。
- 再次hash,确认结点需要放置在哪个链表,否则就将数据插入指定的 HashEntry 位置(链表的尾端);
- 在对应的链表中查找是否相同节点,如果有直接覆盖,如果没有将其放置链表尾部
如果已经有线程获取当前 Segment 的锁,那当前线程会以自旋的方式去继续的调用 tryLock() 方法去获取锁,超过指定次数就挂起,等待唤醒。
get 操作
ConcurrentHashMap 的 get操作跟 HashMap 类似,只是 ConcurrentHashMap 需要进行两次 Hash 去定位数据的存储位置。
size 操作
计算 ConcurrentHashMap 的元素大小是一个有趣的问题,因为他是并发操作的,就是在你计算 size 的时候,他还在并发的插入数据,可能会导致你计算出来的 size 和你实际的 size 有相差(在你return size的时候,插入了多个数据),要解决这个问题,JDK1.7版本用两种方案
- 第一种方案他会使用不加锁的模式去尝试多次计算 ConcurrentHashMap 的 size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的;
- 第二种方案是如果第一种方案不符合,他就会给每个 Segment 加上锁,然后计算ConcurrentHashMap 的 size 返回
在JDK1.8 的版本中
- ConcurrentHashMap 的数据结构是通过 数组+链表+红黑树来实现的。
- 并发控制使⽤ synchronized + CAS 来操作。
- Node节点中 value 和 next 都用 volatile 修饰,保证并发的可见性。
下面对 JDK1.8 中的 ConcurrentHashMap 基本属性进行基本的了解。
// 表最大的容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认表初始容量
private static final int DEFAULT_CAPACITY = 16;
// 可能的最大(非 2 的幂)数组大小。需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 此表的默认并发级别。未使用,只为兼容先前版本。
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 树化阈值
static final int TREEIFY_THRESHOLD = 8;
// 树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 最小树化容量
static final int MIN_TREEIFY_CAPACITY = 64;
// 扩容线程每次最少要迁移16个hash桶
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 可以帮助扩容的最大线程数。2^16 -1 = 65535
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// sizeCtl 中记录 size 大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// ForwardingNode 的 hash 值,表示正在扩容
static final int MOVED = -1; // hash for forwarding nodes
// 树根节点 hash 值
static final int TREEBIN = -2; // hash for roots of trees
// ReservationNode 的 hash值
static final int RESERVED = -3; // hash for transient reservations
// 普通节点的 hash 值可用位
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 可用 CPU 数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 数组table
transient volatile Node[] table;
private transient volatile Node[] nextTable;
private transient volatile long baseCount;
private transient volatile int sizeCtl;
private transient volatile int transferIndex;
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;
以上的基本属性都是 ConCurrentHashMap 的一些边界以及操作时的控制。
下面来看下内部的一些结构组成,这些是整个 ConCurrentHashMap 数据结构的核心。
Node:
Node 是ConcurrentHashMap 存储结构的基本单元,继承于 HashMap 中的 Entry,用于存储数据。
key 和 hash 都用 final 修饰来保证线程安全;
next指向和 value 是通过 volatile 修饰保证线程可见性。
static class Nodeimplements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; // 。。。。。 }
TreeNode:
表示红黑树的节点。没有线程安全措施,但是主要靠 TreeBin(通过 volatile 修饰首结点和等待线程以及红黑树的锁状态)来保证线程安全的。
static final class TreeNodeextends Node { TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red; // 。。。。。 }
TreeBin:
表示红黑树的根,放置在数组table 中。
TreeBin 结点用于添加元素时候的写锁的获取和释放。
static final class TreeBinextends Node { TreeNode root; // 红黑树的根节点 volatile TreeNode first; // 链表的头节点 // 等待者线程(当前lockState是读锁状态) volatile Thread waiter; // 最近一个设置 WAITER 标志位的进程 volatile int lockState; // 读写锁状态 static final int WRITER = 1; // 获取写锁的状态 static final int WAITER = 2; // 等待写锁的状态 static final int READER = 4; // 增加数据时读锁的状态 // 。。。。。 }
ForwardingNode:
一个特殊的 Node 结点, hash 为 -1 ,key 和 val 都为 null。
- 通过 final 修饰i新表来保证线程安全;
- 如果旧数组中的一个 hash表的结点全部迁移到了新的 table 中,则会在这个 hash表中放置一个 Forwarding Node;
- 读操作碰到 ForwardingNode 时,将操作转发到扩容后的新 table 数组上去执行;
- 写操作碰到 ForwardingNode 时,则尝试帮助扩容。
static final class ForwardingNodeextends Node { final Node [] nextTable; ForwardingNode(Node [] tab) { super(MOVED, null, null, null); this.nextTable = tab; } // 。。。。。 }
ReservationNode:
常用于 computeIfAbsent 和 compute 操作。
static final class ReservationNode extends Node {
ReservationNode() {
super(RESERVED, null, null, null);
}
Node find(int h, Object k) {
return null;
}
}
put 操作
put 操作主要流程如下。
- 如果没有初始化,就调用 initTable() 方法来进行初始化;
- 如果该位置没有元素(无 hash 冲突),则用 CAS 向该位置插入;
- 如果该位置有元素(存在 hash 冲突),就加锁来保证线程安全,使用 synchronized 加锁;
- 加锁后,判断该元素的类型
a. 如果是链表结点则尾插到链表尾端
b. 如果是红黑树则添加到红黑树中 - 添加成功后,判断是否需要进行树化;
- addCount() 统计 size,这个方法的意思是 ConcurrentHashMap 的元素个数加 1,但是这个操作也是需要并发安全的,并且元素个数加 1 成功后,会继续判断是否需要进行扩容。
- 同时一个线程在 put 时,如果发现当前 ConcurrentHashMap 正在进行扩容则会去帮助扩容
流程图待补充。
initTable()
private final Node[] initTable() { Node [] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
SizeCtl 为 volatile,线程可见。
- 首先判断是否有其他线程对当前数组table 进行初始化;
- 没有则 CAS去修改 sizeCtl 的值为 -1,修改成功,则当前线程去执行初始化table;
- 如果当前线程发现 sizeCtl 的值已经是 -1,则代表已有线程在初始化table,此时当前线程会调用 Thread.yield() 方法,表明当前线程放弃 cpu 的使用;
- …
后续…
get 、helpTransfer、addCount(baseCount、CounterCell[])、transfer(ForwardingNode、transferIndex、stride、bound)
Unsafe:
Unsafe 对象用于执行一些不安全,较为底层的操作。比如直接访问系统资源。
- compareAndSwapObject:通过 CAS 的方式修改对象的属性
- putOrderedObject:并发安全的给数组的某个位置赋值
- getObjectVolatile:并发安全的获取数组某个元素的值
Unsafe 主要负责并发安全的修改对象的属性或数组某个位置的值
synchronized 主要负责在需要操作某个位置时进行加锁(该位置不为空),比如向某个位置的链表/红黑树插入结点。
[1] ConcurrentHashMap 1.7和1.8区别:https://blog.csdn.net/xingxiupaioxue/article/details/88062163



