ConcurrentHashMap是Java1.5中引用的使用了分段锁机制的一个线程安全的支持高并发的HashMap集合类
简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可是实现多线程put操作。
总结
大概意思就是在hashmap上多加了一层,原来的hashmap加锁整个表都锁住了,十分影响效率,现在用Segment数组,每个位置上都可以放个hash表,然后对Segment加锁,不影响其他位置上的hash表
背景HashMap线程不安全
多线程环境下,使用Hashmap进行put操作会引起死循环,所以在并发情况下不能使用HashMap。
HashTable效率低下
HashTable为保证线程安全付出的代价太大,get()、put()等方法都是synchronized的,这相当于给整个哈希表加了一把大锁。在并发调用HashTable的方法时就会造成大量的时间损耗。
- HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。
- 因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。
- 如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。
锁分段技术
- HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,**那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,**从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。
- 首先将数据分成一段一段**(Segment数组)**的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。
存储结构对比图
- ConcurrnetHashMap 由很多个 Segment 组合,而每一个 Segment 是一个类似于 HashMap 的结构
- 每一个 HashMap 的内部可以进行扩容。但是 Segment 的个数一旦初始化就不能改变
- 默认 Segment 的个数是 16 个,你也可以认为 ConcurrentHashMap 默认支持最多 16 个线程并发。
public class ConcurrentHashMapSegment的定义extends AbstractMap implements ConcurrentMap , Serializable { static final int DEFAULT_INITIAL_CAPACITY= 16; static final float DEFAULT_LOAD_FACTOR= 0.75f; static final int DEFAULT_CONCURRENCY_LEVEL= 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //每个segment中的最小容量,至少为2.避免当next指针使用时,需要立即resize static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大分段数 static final int MAX_SEGMENTS = 1 << 16; //在重排序锁定之前,非同步化尝试调用size等方法的次数,避免无限制的尝试 static final int RETRIES_BEFORE_LOCK = 2; //与实例相关联的一个随机值,用于哈希键的哈希码,使哈希冲突更难找到 private final transient int hashSeed = randomHashSeed(this); //段的掩码值,通过它进行与运算来定位段下标 final int segmentMask; //偏移量,用来确定哈希值中参与段定位的高位的位数 final int segmentShift; // 段,每个段都是一个专门的哈希表 final Segment [] segments; }
继承接口
-
继承了ReentrantLock类,ReentrantLock和synchronized都是可重入的独占锁,只允许线程互斥的访问临界区,这意味着每个Segment都可以当做一个锁,每把锁只锁住整个容器中的部分数据,这样不影响线程访问其他的数据,当然如果是对全局改变时会锁定所有的Segment段。
-
它实现了Serializable接口,可进行对象的序列化与反序列化。
private static final long serialVersionUID = 2249069246763182397L; //最大尝试次数达到限制进入加锁等待状态(对最大尝试次数,目前的实现单核次数为1,多核为64) static final int MAX_SCAN_RETRIES = (Runtime.getRuntime().availableProcessors() > 1) ? 64 : 1; //segment内部的哈希表,访问HashEntry,通过具有volatie的entryAt、setEntryAt方法 volatile transient ConcurrentHashMap.HashEntryHashEntry类[] table; //Segment的哈希表中的HashEntry的数量 transient int count; //修改次数 transient int modCount; //哈希表中的扩容阈值 transient int threshold; //哈希表中的负载因子,所有的Segments中的这个值相等,这么做是为了避免需要连接到外部Object final float loadFactor; Segment(float paramFloat, int paramInt, ConcurrentHashMap.HashEntry [] paramArrayOfHashEntry){ this.loadFactor = paramFloat; this.threshold = paramInt; this.table = paramArrayOfHashEntry; }
与HashMap的Entry作用一致,只是使用了Unsafe来提高读写速度
static final class HashEntry构造函数 无参构造:{ final int hash; final K key; volatile V value; volatile HashEntry next; HashEntry(int hash, K key, V value, HashEntry next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } // 在本对象的next成员的偏移地址处放入n final void setNext(HashEntry n) { UNSAFE.putOrderedObject(this, nextOffset, n); } static final sun.misc.Unsafe UNSAFE; // next成员的偏移地址 static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; // 得到next成员在对象中的偏移量,用来进行链接操作 // 利用UnSafe类可以通过直接操作内存来提高速度 nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
调用了有参构造
//传入了默认初始化容量(16),默认负载因子(0.75f),默认并发级别(16)
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
有参构造:
- initialCapacity: 初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
- loadFactor: 负载因子,之前我们说了,Segment 数组不可以扩容,所以这个负载因子是给每个 Segment 内部使用的,也是。
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// 参数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// concurrencyLevel:并发水平,即,Segment分段数,不能超过最大段数
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 2的多少次方
int sshift = 0; //记录偏移量
int ssize = 1; //记录concurrencyLevel之上的最小2的幂
// 这个循环可以找到 concurrencyLevel 之上最近的 2的次方值
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 记录段偏移量,用于后面计算下标
this.segmentShift = 32 - sshift;
// 记录段掩码
this.segmentMask = ssize - 1;
// initialCapacity 是设置整个 map 初始的大小,
// 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小
// 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c = 容量 / ssize ,默认 64 / 16 = 4,这里是计算每个 Segment 中的hash表的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
// 插入一个元素不至于扩容,插入第二个的时候才会扩容
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//Segment 中的类似于 HashMap 的容量至少是2或者2的倍数
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 创建 Segment 数组,设置 segments[0]
Segment s0 = new Segment(loadFactor, (int)(cap * loadFactor),(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
// 往数组写入 segment[0]
UNSAFE.putOrderedObject(ss, Sbase, s0);
this.segments = ss;
}
put过程
public V put(K key, V value) {
Segment s;
//不能放空值,不然抛出异常
if (value == null)
throw new NullPointerException();
// 1. 计算 key 的 hash 值
int hash = hash(key);
// 2. 根据 hash,segmentShift,segmentMask值找到 Segment 数组中的位置 j
int j = (hash >>> segmentShift) & segmentMask;
//初始化的时候只初始化了 segment[0],但是其他位置还是 null,
// SSHIFT :Segment数组每个元素的偏移量,Sbase :Segment数组第一个元素的偏移量
// 由上面两个偏移量可以算出目标下标元素的物理地址
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + Sbase)) == null) // in ensureSegment
// ensureSegment(j) 对 segment[j] 进行初始化
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}
//已经找到了对应的segment数组下标,接下来要操作hash表
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 在往该 segment 写入前,需要先获取该 segment 的独占锁
// 先看主流程,后面还会具体介绍这部分内容
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 这个是 segment 内部的数组
HashEntry[] tab = table;
// 再利用 hash 值,求应该放置的数组下标
int index = (tab.length - 1) & hash;
// 获取数组该位置处的链表的表头赋值给first
HashEntry first = entryAt(tab, index);
// 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况
for (HashEntry e = first;;) {
//e不等于空说明,表头有结点,遍历链表
if (e != null) {
K k;
//判断hash值和key是否和结点的相等
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
//存旧值
oldValue = e.value;
if (!onlyIfAbsent) {
//覆盖旧值
e.value = value;
++modCount;
}
//跳出循环
break;
}
// 继续顺着链表走
e = e.next;
}
else {
// node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
// 如果不为 null,node的next指针指向链表表头;如果是null,初始化并设置为链表表头。
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
// 如果超过了该 segment 的阈值,这个 segment 需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 扩容后面也会具体分析
else
//没有达到阈值,将 node 放到数组 tab 的 index 位置,
//其实就是将新的节点设置成原链表的表头
setEntryAt(tab, index, node);//头插法
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
return oldValue;
}
初始化槽: ensureSegment
ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。
这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。
private Segment获取写入锁: scanAndLockForPutensureSegment(int k) { //指向当前的Segment数组 final Segment [] ss = this.segments; long u = (k << SSHIFT) + Sbase; // raw offset Segment seg; if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { // 这里看到为什么之前要初始化 segment[0] 了, // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k] // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了 Segment proto = ss[0]; //容量 int cap = proto.table.length; //负载因子 float lf = proto.loadFactor; //计算扩容阈值 int threshold = (int)(cap * lf); // 初始化 segment[k] 内部的数组 HashEntry [] tab = (HashEntry [])new HashEntry[cap]; if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { // 检查该槽是否被其他线程初始化了。 //初始化 Segment s = new Segment (lf, threshold, tab); // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出 while ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
其中的核心思想就是通过MAX_SCAN_RETRIES控制自旋次数,防止无限制的重复自旋浪费资源。这个方法很显然见名知意,它的作用就是遍历获取锁然后进行数据插入,Segment中还有一个和这个方法十分类似的scanAndLock方法,它的实现思想和这个方法基本一致,不过这里的scanAndLockForPut主要是用在数据插入中,而scanAndLock则主要用在remove和replace方法中。接下来对Segment的put方法进行分析。
自旋锁
自旋锁是指尝试获取的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是会消耗CPU性能
private HashEntry扩容: rehashscanAndLockForPut(K key, int hash, V value) { //获取链表头结点,相当于找到桶的位置 HashEntry first = entryForHash(this, hash); //保存头结点 HashEntry e = first; HashEntry node = null; int retries = -1; // 定位节点时为负(原注释) //如果尝试加锁失败,那么就对segment[hash]对应的链表进行遍历找到需要put的这个entry所在的链表中的位置, //这里之所以进行一次遍历找到坑位,主要是为了通过遍历过程将遍历过的entry全部放到CPU高速缓存中, //这样在获取到锁了之后,再次进行定位的时候速度会十分快,这是在线程无法获取到锁前并等待的过程中的一种预热方式。 while (!tryLock()) { HashEntry f; // to recheck first below //获取锁失败,初始时retries=-1必然开始先进入第一个if if (retries < 0) {//<1> //e=null代表两种意思, //1.第一种就是遍历链表到了最后,仍然没有发现指定key的entry; //2.第二种情况是刚开始时entryForHash(通过hash找到的table中对应位置链表的结点)找到的HashEntry就是空的 if (e == null) { //<1.1> if (node == null) // speculatively create node node = new HashEntry (hash, key, value, null); retries = 0; } else if (key.equals(e.key))//<1.2> 遍历过程发现链表中找到了我们需要的key的坑位 retries = 0; else//<1.3> 当前位置对应的key不是我们需要的,遍历下一个 e = e.next; } else if (++retries > MAX_SCAN_RETRIES) {//<2> MAX_SCAN_RETRIES为64 // 尝试获取锁次数超过设置的最大值,直接进入阻塞等待,这就是所谓的有限制的自旋获取锁, //之所以这样是因为如果持有锁的线程要过很久才释放锁,这期间如果一直无限制的自旋其实是对cpu性能有消耗的, //这样无限制的自旋是不利的,所以加入最大自旋次数,超过这个次数则进入阻塞状态等待对方释放锁并获取锁。 lock(); break; } // 遍历过程中,有可能其它线程改变了遍历的链表,这时就需要重新进行遍历了。 //判断是否初始化了结点 并且 判断链表头结点是否改变(1.7使用头插法) else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {//<3> //重新设置first和e e = first = f; // re-traverse if entry changed //重新设置 retries = -1; } } return node; }
对数组进行扩容,由于扩容过程需要将老的链表中的节点适用到新数组中,所以为了优化效率,可以对已有链表进行遍历,对于老的oldTable中的每个HashEntry,从头结点开始遍历,找到第一个后续所有节点在新table中index保持不变的节点fv,假设这个节点新的index为newIndex,那么直接newTable[newIndex]=fv,即可以直接将这个节点以及它后续的链表中内容全部直接复用copy到newTable中,这样最好的情况是所有oldTable中对应头结点后跟随的节点在newTable中的新的index均和头结点一致,那么就不需要创建新节点,直接复用即可。最坏情况当然就是所有节点的新的index全部发生了变化,那么就全部需要重新依据k,v创建新对象插入到newTable中。
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。 private void rehash(HashEntryget 过程分析node) { //旧表指针oldTable HashEntry [] oldTable = table; //保存旧容量 int oldCapacity = oldTable.length; //新容量为2 倍 int newCapacity = oldCapacity << 1; //重新计算扩容阈值 threshold = (int)(newCapacity * loadFactor); // 创建新数组 HashEntry [] newTable = (HashEntry []) new HashEntry[newCapacity]; // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’ int sizeMask = newCapacity - 1; // 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置 for (int i = 0; i < oldCapacity ; i++) { // e 是链表的第一个元素 HashEntry e = oldTable[i]; if (e != null) { //辅助指针 HashEntry next = e.next; // 计算应该放置在新数组中的位置, //新位置为 原下标或者原下标+旧容量 int idx = e.hash & sizeMask; if (next == null) // 该链表处只有一个元素,那比较好办 //直接在新表的对应位置放入结点 newTable[idx] = e; else { // e 是链表表头 赋值给lastRun HashEntry lastRun = e; // idx 是当前链表的头结点 e 的新位置 int lastIdx = idx; // 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的 //遍历后续链表(头表的下一个结点) for (HashEntry last = next;last != null; last = last.next) { //计算新位置 int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置 newTable[lastIdx] = lastRun; // 下面的操作是处理 lastRun 之前的节点, //这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中 for (HashEntry p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry (h, p.key, v, n); } } } } // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
- 计算 hash 值,找到 segment 数组中的具体位置,或我们前面用的“槽”
- 槽中也是一个数组,根据 hash 找到数组中具体的位置
- 到这里是链表了,顺着链表进行查找即可
public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
// 1. 计算hash 值
int h = hash(key);
// h >>> segmentShift : 绝对右移,相当于hash值的高n位参加了段的定位
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase;
// 2. 根据 hash 找到对应的 segment
// UNSAFE.getObjectVolatile()得到该数组物理地址下d 对象
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 3. 找到segment 内部数组相应位置的链表,遍历
// (tab.length - 1) & h : 使用全部散列值进行元素的下标定位
// TSHIFT :哈希表每个元素的偏移量,Tbase :哈希表第一个元素的偏移量
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
remove方法
public V remove(Object key) {
int hash = hash(key);
//根据散列值取得目标段
Segment s = segmentForHash(hash);
0return s == null ? null : s.remove(key, hash, null);
}
public boolean remove(Object key, Object value) {
int hash = hash(key);
Segment s;
return value != null && (s = segmentForHash(hash)) != null &&
//调用Segment里的方法
s.remove(key, hash, value) != null;
}
并发性问题分析
- get()方法中没有加任何的锁,使用CAS的方式来尽可能保证线程安全,但是存在若一致性问题。添加节点到链表的操作时插入到表头的,所以如果这个时候get操作在链表的过程中已经到了中间是不会被影响的。另一个并发问题就是get操作在put之后,需要保证刚刚插入表头的节点被读取,这个依赖于setEntryAt方法中使用的UNSAFE.putOrderedObject.
- 扩容:扩容是新创建了数组,然后进行迁移数据,最后将newTable设置给属性table, 所以如果get操作此时也在进行,那么也没关系,如果get先行,那么就是在旧的table上做查询操作;而put先行,那么put操作的可见性保证就是table使用了volatile关键字.



