在并发编程中使用HashMap可能会导致循环引用和数据丢失的问题,而线程安全的HashTable容器由于使用了synchronnized,效率又非常低下,基于以上两个原因便有了ConcurrentHashMap
2.ConcurrentHashMap使用了分段锁技术的介绍?HashTable 容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable 的线程都必须竞争同一把锁。假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是 ConcurrentHashMap 所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
3.ConcurrentHashMap的结构ConcurrentHashMap由Segment数组结构和HashEntry数组结构组成,Segment是一种可重入锁(ReetrantLock),HashEntry 则用于
存储键值对数据,与HashMap中的Entry类似,但是它的value和next都加了volatile修饰,并且hash值也用了final修饰,也就是无法更改一个HashEntry对象的hash值
static class Entryimplements Map.Entry { final K key; V value; Entry next; int hash;
static final class HashEntry4.ConcurrentHashMap的构造函数分析 (1)变量含义解析{ final int hash; final K key; volatile V value; volatile HashEntry next;
//initialCapacity 初始容量 指全部HashEntry[]的容量和
//loadFactor 负载因子 默认0.75
//concurrencyLevel 并发级别(并发度),通过它来控制Segment数组的容量(能够划分的段数),也就是控制可以有多少个线程并发操作HashEntry[], 默认16
//MAX_SEGMENTS 规定了Segment数组的最大容量,也就是最多能分成多少段 默认1 << 16(2^16)
//ssize 用来表示Segment[]的容量大小
// sshift 记录ssize右移的位数,也就是segment[]容量是2的几次幂
// segmentShift表示segment[]容量的二进制前导0的个数
// segmentMask 表示segment数组容量减1
// cap表示每个段内数组容量的大小
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
-----------------①---------------------
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
-----------------②--------------------
int c = initialCapacity / ssize;
------------------③--------------------
if (c * ssize < initialCapacity)
++c;
-----------------④---------------------
in}t cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
------------------⑤---------------------
// create segments and segments[0]
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
-------------------⑥---------------------
//Class sc = Segment[].class;
//Sbase = UNSAFE.arraybaseOffset(sc);
// Sbase表示数组中第一个元素的偏移地址
UNSAFE.putOrderedObject(ss, Sbase, s0); // ordered write of segments[0]
this.segments = ss;
(2)执行过程分析
由于Segment[]和每个段内HashEntry[]的容量都必须是2的幂次方,这样就可以保证通过hash&(数组长度-1)获取到的索引值落在数组长度范围内,并且索引值由hash值来决定(与hashmap中相同)
①的位置首先判断segment数组长度是否达到了并发度,如果没有达到的话可以,通过左移1位增加数组长度,直到大于等于concurrencyLevel
②获取到segment数组的容量后,需要计算出每个segment段内,应该放容量为多少的HashEntry数组,通过int c = initialCapacity / ssize,能够获得,但是因为 / 为整除运算,所以获得的数组容量一定是舍去了小数部分的,假设当initialCapacity为17而ssize为16的话,c=1,就会丢失一个容量
③通过if (c * ssize < initialCapacity)判断是否刚好可以整除,如果可以则证明不会出现丢失容量的情况;如果不可以,即c * ssize < initialCapacity,则证明有小数部分的存在,有容量的丢失,使c++
④虽然保证了HashEntry[]数组的初始总容量>=我们传入的初始容量,但是却不能保证每个段内的HashEntry[]数组容量为2的幂次方,所以利用cap <<= 1;获得2的幂次方的HashEntry[]数组容量
⑤创建Segment对象和Segment
因为segment的构造器中有一些赋值操作,又由于Segment
Segment(float lf, int threshold, HashEntry[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; }
⑥调用unsafe类的putOrderedObject方法,将造好的segment对象s0,放入数组的起始位置
---------------------------------------------Unsafe类--------------------------------------------------
Unsafe类通过JNI的方式访问本地的C++实现库从而使java具有了直接操作内存空间的能力,Unsafe提供了硬件级别的原子性操作
public final class Unsafe {
// 在静态方法中已经初始化好了
private static final Unsafe theUnsafe;
.......
private Unsafe() {
}
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (var0.getClassLoader() != null) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
......
}
1.unsafe类不能直接new,因为构造器私有
2.可以通过类的静态方法getUnsafe()获取,但前提是调用此方法的类必须是通过启动类加载器加载(BootstrapClassLoader)的如ConcurrentHashMap或其内部类等,否则抛出异常SecurityException("Unsafe")
3.普通类想要获取Unsafe对象,可以通过反射
try {
// 注意:Unsafe的对象不能直接new,这里通过反射去获取Unsafe里面的私有成员变量theUnsafe
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe)field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
4.常用api
①objectFieldOffset()方法用于获取某个字段相对Java对象的“起始地址”的偏移量
long address = unsafe.objectFieldOffset(People.class.getDeclaredField("name"));
System.out.println("实例变量name属性的偏移量:"+address);
②unsafe.compareAndSwapInt这个方法,这个方法有四个参数,其中第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的valueOffset的值),第三个参数为期待的值,第四个为更新后的值。 ③putIntVolatile(Object obj, long offset, int value)设置obj对象中offset偏移地址对应的整型field的值为指定值。支持volatile store语义 设置后立马更新到内存对其他线程可见
③getIntVolatile(Object obj, long offset)获取obj对象中offset偏移地址对应的整型field的值,支持volatile load语义
④putObjectVolatile(Object obj, long offset, Object value)设置obj对象中offset偏移地址对应的object型field的值为指定值。支持volatile store语义
⑤arraybaseOffset(Class arrayClass)获取给定数组中第一个元素的偏移地址。为了存取数组中的元素,这个偏移地址与arrayIndexScale 法的非0返回值一起被使用
⑥arrayIndexScale(Class arrayClass)获取用户给定数组寻址的换算因子(其实就是数据中元素偏移地址的增量,因为数组中的元素的地址是连续的),一个合适的换算因子不能返回的时候(例如:基本类型),返回0
例:arraybaseOffset,获取数组第一个元素的偏移地址。arrayIndexScale,数组中元素的大小,占用多少个字节。arraybaseOffset与arrayIndexScale配合起来使用,就可以定位数组中每个元素在内存中的位置。
⑦putOrderedObject(Object obj, long offset, Object value)设置obj对象中offset偏移地址对应的object型field的值为指定值。这是一个有序或者有延迟的putObjectVolatile方法,并且不保证值的改变被其他线程立即看到。只有在field被volatile修饰并且期望被意外修改的时候使用才有用。
5.ConcurrentHashMap的put方法 (1)变量含义解析// segmentShift表示segment[]容量的二进制前导0的个数(32-sshift) // segmentMask 表示segment数组容量减1 // sshift 记录ssize右移的位数,也就是segment[]容量是2的几次幂
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment s;
-------------------①-------------------
if (value == null)
throw new NullPointerException();
--------------------②--------------------
int hash = hash(key);
---------------------③---------------------
int j = (hash >>> segmentShift) & segmentMask;
----------------------④--------------------
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + Sbase)) == null) // in ensureSegment
--------------------⑤-----------------------
s = ensureSegment(j);
----------------------⑥-----------------------
return s.put(key, hash, value, false);
}
(2)执行过程分析
①put的value值不能为null,否则抛出异常,因为我们通过get方法获取不到key时返回null,避免二义性
②获取key的hash值的方式与hashmap相同,
③先获取在segment数组中的索引j,获取方式:
hash >>> segmentShift 获取hash值的高sshift位,因为由hashmap中的结论可知,如果segment[]容量为2^4,那么索引值只与hash的低4位有关,它的作用是想让hash值的高位决定segment[]数组中的位置
& segmentMask 与数组长度减一相与
④获取segment数组中指定索引下标j处的segment对象,并判断是否为null,如果为null,就调用ensureSegment(j)方法,确定数组中位置j处的segment对象:
// ss表示数组中元素偏移地址的增量 // SSHIFT表示ss是2的几次幂 Class sc = Segment[].class; ss = UNSAFE.arrayIndexScale(sc); SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
(j << SSHIFT) + Sbase) ------> j * ss + Sbase 获取数组中下标为j的内存地址,
比如说ss是2^4,那么SSHIFT=4,j需要左移4位,也就是乘2^4,只是用位运算代替乘法效率更高
⑤ensureSegment(int k)
@SuppressWarnings("unchecked")
private Segment ensureSegment(int k) {
final Segment[] ss = this.segments;
long u = (k << SSHIFT) + Sbase; // raw offset
Segment seg;
// 1.再次判断索引j处的segment对象是否存在
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 2.如果不存在获取数组中索引为0处,开始初始化时添加好的segment对象
Segment proto = ss[0]; // use segment 0 as prototype
// 3.获取segemnt[0]处segemnt对象的属性值,通过proto
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry[] tab = (HashEntry[])new HashEntry[cap];
// 4.再次检查索引j处的segment对象是否存在
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
// 5.如果仍不存在,利用获取到的属性创建Segment对象
Segment s = new Segment(lf, threshold, tab);
// 6.此处采用了CAS的自旋锁,不断循环判断segment数组中索引为0处是否有对象,如果没有将创建好的segment对象添加到数组中
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
// 7.最后将获取到的segment返回
return seg;
}
可以看到ensureSegment(int k)中采用了双重检查机制+CAS自旋锁,双重检查能够提高程序的执行效率,一但判断不为null,即刻返回结果,不需要执行多余的代码。并没有使用synchronized上锁,能够提高程序的并发度,但由于一直在进行循环,对cpu的消耗过大
⑥创建好了segment对象,就需要向其中的HashEntry[]中put Entry对象了,
为什么在向HashEntry[]中put时用了锁而不是CAS自旋锁呢?
因为由于我们已经采用分段减少了hashmap数组长度,即使上锁,并发度也大大提高了不需要再用自旋去循环消耗cpu性能了(个人理解,欢迎指正),。
// Segment继承了 ReentrantLock static final class Segmentextends ReentrantLock implements Serializable { ....... final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 1.使用trylock试着上锁,上锁成功返回true,失败返回false,不会阻塞,可使用此方法,循环上锁,在未获得锁时,做一些其他的事情 HashEntry node = tryLock() ? null : // 2.上锁失败调用scanAndLockForPut(key, hash, value),处理其他业务逻辑 scanAndLockForPut(key, hash, value); // 3.成功获得了锁 V oldValue; try { HashEntry [] tab = table; int index = (tab.length - 1) & hash; // 4.获取HashEntry[]数组中,指定索引处的HashEntry对象 HashEntry first = entryAt(tab, index); // 5.遍历链表 for (HashEntry e = first;;) { if (e != null) { K k; // 6.节点key相同覆盖 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; // 7.onlyIfAbsent默认是false,进行新换旧替换 if (!onlyIfAbsent) { e.value = value; // 8.增加操作次数 ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry (hash, key, value, first); int c = count + 1; // 9.数组中节点数量大于threshold扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 10.它的扩容操作只对每个段内的数组进行扩容 rehash(node); else // 11.利用UNSAFE.putOrderedObject()插入节点到链表中,因为HashMap对象没用volatile修饰,如果直接通过tab[index]=node赋值,只是在自己的工作空间进行了修改,不能保证替换后,其他线程的可见性,所以要直接在内存中修改 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } // 12.返回替换之前的或者新插入的值 return oldValue; } ....... }
scanAndLockForPut(K key, int hash, V value)
该函数的主要作用就是利用等待锁的空闲时间,创建HashEntry节点对象(可能创建成功也可能失败),至于while下的其他业务逻辑可能是为了循环慢一些,减少性能消耗?(不确定,知道的可以留言)
private HashEntryscanAndLockForPut(K key, int hash, V value) { // 1.获取hash值对应HashEntry[]中的HashEntry对象 HashEntry first = entryForHash(this, hash); HashEntry e = first; HashEntry node = null; int retries = -1; // negative while locating node // 2.循环检查是否上锁成功,没成功,进入循环 while (!tryLock()) { HashEntry f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry (hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } // 返回node值可以为null,也可以是创建好的HashEntry对象 return node; }
entryForHash(this, hash)
// Class tc = HashEntry[].class;
// ts = UNSAFE.arrayIndexScale(tc);
// TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
// TSHIFT表示ts的几次幂
@SuppressWarnings("unchecked")
static final HashEntry entryForHash(Segment seg, int h) {
HashEntry[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase);
}
((tab.length - 1) & h)就是用hash的低位与HashMap[]数组长度-1相与,再获取指定索引处的HashMap对象,为什么取segment[]索引用高四位,而取hashmap[]索引用低四位呢?
首先高位低位结合使用,相当于使用扰动函数,增加hash值的散列性,减少hash碰撞的产生,还有就是如果两个数组获取索引时,都采用高位或者低位,当segemnt[]和每个段内hashmap[]长度相同时,会损失n*(n-1)个数组空间,因为不论是segment[]还是hashmap[],索引下标都由hash值决定,取相同位置的hash值,两个数组得到的索引一定相同,那么每个段内,只有segment[i].hashmap[i]才能放入节点
rehash(node)扩容
HashMap[]扩容操作,会一直循环遍历到链表尾部,标记扩容后,最后仍相连的头部节点,将他们一起移到新的链表中,然后再重新遍历,遍历到刚才标记的头部节点,采用头插法,一个节点一个节点的移动到新数组
@SuppressWarnings("unchecked")
private void rehash(HashEntry node) {
HashEntry[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry[] newTable =
(HashEntry[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry e = oldTable[i];
if (e != null) {
HashEntry next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry lastRun = e;
int lastIdx = idx;
for (HashEntry last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry n = newTable[k];
newTable[k] = new HashEntry(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
6.ConcurrentHashMap的get方法
JDK1.7的ConcurrentHashMap的get操作是不加锁的,因为在每个Segment中定义的HashEntry数组和在每个HashEntry中定义的value和next HashEntry节点都是volatile类型的,volatile类型的变量可以保证其在多线程之间的可见性,因此可以被多个线程同时读,从而不用加锁。而其get操作步骤也比较简单,定位Segment –> 定位HashEntry –> 通过getObjectVolatile()方法获取指定偏移量上的HashEntry –> 通过循环遍历链表获取对应值。
定位Segment:(((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase
定位HashEntry:(((tab.length - 1) & h)) << TSHIFT) + Tbase
public V get(Object key) {
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase;
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
7.ConcurrentHashMap的size方法
ConcurrentHashMap的size操作的实现方法也非常巧妙,一开始并不对Segment加锁,而是直接尝试将所有的Segment元素中的count相加,这样执行两次,然后将两次的结果对比,如果两次结果相等则直接返回;而如果两次结果不同,则再将所有Segment加锁,然后再执行统计得到对应的size值。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
视频链接:HashMap和ConcurrentHashMap深入解析源码合集,看完吊打面试官!|图灵周瑜_哔哩哔哩_bilibili
原文链接:Unsafe类API介绍_相信明天的博客-CSDN博客
原文链接:Unsafe 类的API说明_Java&&大数据专栏-CSDN博客
原文链接:java Unsafe类数组操作_没事搞点事做serendipity的博客-CSDN博客 原文链接:Java并发包concurrent——ConcurrentHashMap_上善若水的木偶戏-CSDN博客
参考书籍:java并发编程的艺术



