栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

解决HashMap线程不安全之ConcurrentHashMap源码剖析(jdk1.7)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

解决HashMap线程不安全之ConcurrentHashMap源码剖析(jdk1.7)

一、jdk1.7中的ConcurrentHashMap 1.为什么会出现ConcurrentHashMap?

在并发编程中使用HashMap可能会导致循环引用和数据丢失的问题,而线程安全的HashTable容器由于使用了synchronnized,效率又非常低下,基于以上两个原因便有了ConcurrentHashMap

2.ConcurrentHashMap使用了分段锁技术的介绍?

HashTable 容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable 的线程都必须竞争同一把锁。假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是 ConcurrentHashMap 所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

3.ConcurrentHashMap的结构

ConcurrentHashMap由Segment数组结构和HashEntry数组结构组成,Segment是一种可重入锁(ReetrantLock),HashEntry 则用于

存储键值对数据,与HashMap中的Entry类似,但是它的value和next都加了volatile修饰,并且hash值也用了final修饰,也就是无法更改一个HashEntry对象的hash值

static class Entry implements Map.Entry {
    final K key;
    V value;
    Entry next;
    int hash;
static final class HashEntry {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry next;

4.ConcurrentHashMap的构造函数分析 (1)变量含义解析

//initialCapacity 初始容量 指全部HashEntry[]的容量和
//loadFactor 负载因子  默认0.75
//concurrencyLevel 并发级别(并发度),通过它来控制Segment数组的容量(能够划分的段数),也就是控制可以有多少个线程并发操作HashEntry[], 默认16 
//MAX_SEGMENTS 规定了Segment数组的最大容量,也就是最多能分成多少段   默认1 << 16(2^16)
//ssize 用来表示Segment[]的容量大小
// sshift 记录ssize右移的位数,也就是segment[]容量是2的几次幂
// segmentShift表示segment[]容量的二进制前导0的个数
// segmentMask 表示segment数组容量减1
// cap表示每个段内数组容量的大小

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    -----------------①---------------------
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    -----------------②--------------------
    int c = initialCapacity / ssize;
    ------------------③--------------------
    if (c * ssize < initialCapacity)
        ++c;
    -----------------④---------------------
    in}t cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    ------------------⑤---------------------
    // create segments and segments[0]
    Segment s0 =
        new Segment(loadFactor, (int)(cap * loadFactor),
                         (HashEntry[])new HashEntry[cap]);
    Segment[] ss = (Segment[])new Segment[ssize];
    -------------------⑥---------------------
     //Class sc = Segment[].class;
     //Sbase = UNSAFE.arraybaseOffset(sc);
     // Sbase表示数组中第一个元素的偏移地址
    UNSAFE.putOrderedObject(ss, Sbase, s0); // ordered write of segments[0]
    this.segments = ss;
(2)执行过程分析

由于Segment[]和每个段内HashEntry[]的容量都必须是2的幂次方,这样就可以保证通过hash&(数组长度-1)获取到的索引值落在数组长度范围内,并且索引值由hash值来决定(与hashmap中相同)

①的位置首先判断segment数组长度是否达到了并发度,如果没有达到的话可以,通过左移1位增加数组长度,直到大于等于concurrencyLevel

②获取到segment数组的容量后,需要计算出每个segment段内,应该放容量为多少的HashEntry数组,通过int c = initialCapacity / ssize,能够获得,但是因为 / 为整除运算,所以获得的数组容量一定是舍去了小数部分的,假设当initialCapacity为17而ssize为16的话,c=1,就会丢失一个容量

③通过if (c * ssize < initialCapacity)判断是否刚好可以整除,如果可以则证明不会出现丢失容量的情况;如果不可以,即c * ssize < initialCapacity,则证明有小数部分的存在,有容量的丢失,使c++

④虽然保证了HashEntry[]数组的初始总容量>=我们传入的初始容量,但是却不能保证每个段内的HashEntry[]数组容量为2的幂次方,所以利用cap <<= 1;获得2的幂次方的HashEntry[]数组容量

⑤创建Segment对象和Segment[]数组,为什么要先创建一个Segment对象呢?

因为segment的构造器中有一些赋值操作,又由于Segment[]数组中每一个segment对象都是相同的,如果创建好了一个segment[0],这些属性值只需要从segment[0]中去取就好了(比如segment数组长度ssize,hashmap长度cap等),不需要重新获取一遍了,节省了时间

Segment(float lf, int threshold, HashEntry[] tab) {
    this.loadFactor = lf;
    this.threshold = threshold;
    this.table = tab;
}

⑥调用unsafe类的putOrderedObject方法,将造好的segment对象s0,放入数组的起始位置

---------------------------------------------Unsafe类--------------------------------------------------

Unsafe类通过JNI的方式访问本地的C++实现库从而使java具有了直接操作内存空间的能力,Unsafe提供了硬件级别的原子性操作

public final class Unsafe {
    // 在静态方法中已经初始化好了
        private static final Unsafe theUnsafe;
                .......
        private Unsafe() {
        }
​
    @CallerSensitive
    public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        if (var0.getClassLoader() != null) {
            throw new SecurityException("Unsafe");
        } else {
            return theUnsafe;
        }
    }
    ......
    }

1.unsafe类不能直接new,因为构造器私有

2.可以通过类的静态方法getUnsafe()获取,但前提是调用此方法的类必须是通过启动类加载器加载(BootstrapClassLoader)的如ConcurrentHashMap或其内部类等,否则抛出异常SecurityException("Unsafe")

3.普通类想要获取Unsafe对象,可以通过反射

    try {
        // 注意:Unsafe的对象不能直接new,这里通过反射去获取Unsafe里面的私有成员变量theUnsafe
        Field field = Unsafe.class.getDeclaredField("theUnsafe");
        field.setAccessible(true);
        unsafe = (Unsafe)field.get(null);
    } catch (Exception e) {
        e.printStackTrace();
    }

4.常用api

①objectFieldOffset()方法用于获取某个字段相对Java对象的“起始地址”的偏移量

long address = unsafe.objectFieldOffset(People.class.getDeclaredField("name"));
System.out.println("实例变量name属性的偏移量:"+address);

②unsafe.compareAndSwapInt这个方法,这个方法有四个参数,其中第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的valueOffset的值),第三个参数为期待的值,第四个为更新后的值。 ③putIntVolatile(Object obj, long offset, int value)设置obj对象中offset偏移地址对应的整型field的值为指定值。支持volatile store语义 设置后立马更新到内存对其他线程可见

③getIntVolatile(Object obj, long offset)获取obj对象中offset偏移地址对应的整型field的值,支持volatile load语义

④putObjectVolatile(Object obj, long offset, Object value)设置obj对象中offset偏移地址对应的object型field的值为指定值。支持volatile store语义

⑤arraybaseOffset(Class arrayClass)获取给定数组中第一个元素的偏移地址。为了存取数组中的元素,这个偏移地址与arrayIndexScale 法的非0返回值一起被使用

⑥arrayIndexScale(Class arrayClass)获取用户给定数组寻址的换算因子(其实就是数据中元素偏移地址的增量,因为数组中的元素的地址是连续的),一个合适的换算因子不能返回的时候(例如:基本类型),返回0

例:arraybaseOffset,获取数组第一个元素的偏移地址。arrayIndexScale,数组中元素的大小,占用多少个字节。arraybaseOffset与arrayIndexScale配合起来使用,就可以定位数组中每个元素在内存中的位置。

⑦putOrderedObject(Object obj, long offset, Object value)设置obj对象中offset偏移地址对应的object型field的值为指定值。这是一个有序或者有延迟的putObjectVolatile方法,并且不保证值的改变被其他线程立即看到。只有在field被volatile修饰并且期望被意外修改的时候使用才有用。

5.ConcurrentHashMap的put方法 (1)变量含义解析
// segmentShift表示segment[]容量的二进制前导0的个数(32-sshift)
// segmentMask 表示segment数组容量减1
// sshift 记录ssize右移的位数,也就是segment[]容量是2的几次幂
@SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment s;
    -------------------①-------------------
    if (value == null)
        throw new NullPointerException();
    --------------------②--------------------
    int hash = hash(key);
    ---------------------③---------------------
    int j = (hash >>> segmentShift) & segmentMask;
    ----------------------④--------------------
    if ((s = (Segment)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + Sbase)) == null) //  in ensureSegment
     --------------------⑤-----------------------
        s = ensureSegment(j);
    ----------------------⑥-----------------------
    return s.put(key, hash, value, false);
}
(2)执行过程分析

①put的value值不能为null,否则抛出异常,因为我们通过get方法获取不到key时返回null,避免二义性

②获取key的hash值的方式与hashmap相同,

③先获取在segment数组中的索引j,获取方式:

hash >>> segmentShift 获取hash值的高sshift位,因为由hashmap中的结论可知,如果segment[]容量为2^4,那么索引值只与hash的低4位有关,它的作用是想让hash值的高位决定segment[]数组中的位置

& segmentMask 与数组长度减一相与

④获取segment数组中指定索引下标j处的segment对象,并判断是否为null,如果为null,就调用ensureSegment(j)方法,确定数组中位置j处的segment对象:

// ss表示数组中元素偏移地址的增量
// SSHIFT表示ss是2的几次幂
Class sc = Segment[].class;
ss = UNSAFE.arrayIndexScale(sc);
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);

(j << SSHIFT) + Sbase) ------> j * ss + Sbase 获取数组中下标为j的内存地址,

比如说ss是2^4,那么SSHIFT=4,j需要左移4位,也就是乘2^4,只是用位运算代替乘法效率更高

⑤ensureSegment(int k)

@SuppressWarnings("unchecked")
private Segment ensureSegment(int k) {
    final Segment[] ss = this.segments;
    long u = (k << SSHIFT) + Sbase; // raw offset
    Segment seg;
    // 1.再次判断索引j处的segment对象是否存在
    if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 2.如果不存在获取数组中索引为0处,开始初始化时添加好的segment对象
        Segment proto = ss[0]; // use segment 0 as prototype
        // 3.获取segemnt[0]处segemnt对象的属性值,通过proto
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry[] tab = (HashEntry[])new HashEntry[cap];
        // 4.再次检查索引j处的segment对象是否存在
        if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck
         // 5.如果仍不存在,利用获取到的属性创建Segment对象
            Segment s = new Segment(lf, threshold, tab);
         // 6.此处采用了CAS的自旋锁,不断循环判断segment数组中索引为0处是否有对象,如果没有将创建好的segment对象添加到数组中
            while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    // 7.最后将获取到的segment返回
    return seg;
}

可以看到ensureSegment(int k)中采用了双重检查机制+CAS自旋锁,双重检查能够提高程序的执行效率,一但判断不为null,即刻返回结果,不需要执行多余的代码。并没有使用synchronized上锁,能够提高程序的并发度,但由于一直在进行循环,对cpu的消耗过大

⑥创建好了segment对象,就需要向其中的HashEntry[]中put Entry对象了,

为什么在向HashEntry[]中put时用了锁而不是CAS自旋锁呢?

因为由于我们已经采用分段减少了hashmap数组长度,即使上锁,并发度也大大提高了不需要再用自旋去循环消耗cpu性能了(个人理解,欢迎指正),。

// Segment继承了 ReentrantLock
static final class Segment extends ReentrantLock implements Serializable {
     .......
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 1.使用trylock试着上锁,上锁成功返回true,失败返回false,不会阻塞,可使用此方法,循环上锁,在未获得锁时,做一些其他的事情
    HashEntry node = tryLock() ? null :
    // 2.上锁失败调用scanAndLockForPut(key, hash, value),处理其他业务逻辑
        scanAndLockForPut(key, hash, value);
    // 3.成功获得了锁
    V oldValue;
    try {
        HashEntry[] tab = table;
        int index = (tab.length - 1) & hash;
        // 4.获取HashEntry[]数组中,指定索引处的HashEntry对象
        HashEntry first = entryAt(tab, index);
        // 5.遍历链表
        for (HashEntry e = first;;) {
            if (e != null) {
                K k;
         // 6.节点key相同覆盖
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
          // 7.onlyIfAbsent默认是false,进行新换旧替换
                    if (!onlyIfAbsent) {
                        e.value = value;
           // 8.增加操作次数
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry(hash, key, value, first);
                int c = count + 1;
            // 9.数组中节点数量大于threshold扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
             // 10.它的扩容操作只对每个段内的数组进行扩容
                    rehash(node);
                else
              // 11.利用UNSAFE.putOrderedObject()插入节点到链表中,因为HashMap对象没用volatile修饰,如果直接通过tab[index]=node赋值,只是在自己的工作空间进行了修改,不能保证替换后,其他线程的可见性,所以要直接在内存中修改
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
         // 12.返回替换之前的或者新插入的值
    return oldValue;
}
     .......
 }

scanAndLockForPut(K key, int hash, V value)

该函数的主要作用就是利用等待锁的空闲时间,创建HashEntry节点对象(可能创建成功也可能失败),至于while下的其他业务逻辑可能是为了循环慢一些,减少性能消耗?(不确定,知道的可以留言)

private HashEntry scanAndLockForPut(K key, int hash, V value) {
    // 1.获取hash值对应HashEntry[]中的HashEntry对象
    HashEntry first = entryForHash(this, hash);
    HashEntry e = first;
    HashEntry node = null;
    int retries = -1; // negative while locating node
    // 2.循环检查是否上锁成功,没成功,进入循环
    while (!tryLock()) {
        HashEntry f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    // 返回node值可以为null,也可以是创建好的HashEntry对象
    return node;
}

entryForHash(this, hash)

//  Class tc = HashEntry[].class;
//  ts = UNSAFE.arrayIndexScale(tc);
//  TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
// TSHIFT表示ts的几次幂
@SuppressWarnings("unchecked")
static final  HashEntry entryForHash(Segment seg, int h) {
    HashEntry[] tab;
    return (seg == null || (tab = seg.table) == null) ? null :
        (HashEntry) UNSAFE.getObjectVolatile
        (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase);
}

((tab.length - 1) & h)就是用hash的低位与HashMap[]数组长度-1相与,再获取指定索引处的HashMap对象,为什么取segment[]索引用高四位,而取hashmap[]索引用低四位呢?

首先高位低位结合使用,相当于使用扰动函数,增加hash值的散列性,减少hash碰撞的产生,还有就是如果两个数组获取索引时,都采用高位或者低位,当segemnt[]和每个段内hashmap[]长度相同时,会损失n*(n-1)个数组空间,因为不论是segment[]还是hashmap[],索引下标都由hash值决定,取相同位置的hash值,两个数组得到的索引一定相同,那么每个段内,只有segment[i].hashmap[i]才能放入节点

rehash(node)扩容

HashMap[]扩容操作,会一直循环遍历到链表尾部,标记扩容后,最后仍相连的头部节点,将他们一起移到新的链表中,然后再重新遍历,遍历到刚才标记的头部节点,采用头插法,一个节点一个节点的移动到新数组

 

@SuppressWarnings("unchecked")
private void rehash(HashEntry node) {
​
    HashEntry[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    HashEntry[] newTable =
        (HashEntry[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry e = oldTable[i];
        if (e != null) {
            HashEntry next = e.next;
            int idx = e.hash & sizeMask;
            if (next == null)   //  Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry lastRun = e;
                int lastIdx = idx;
                for (HashEntry last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry n = newTable[k];
                    newTable[k] = new HashEntry(h, p.key, v, n);
                }
            }
        }
    }
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
6.ConcurrentHashMap的get方法

JDK1.7的ConcurrentHashMap的get操作是不加锁的,因为在每个Segment中定义的HashEntry数组和在每个HashEntry中定义的value和next HashEntry节点都是volatile类型的,volatile类型的变量可以保证其在多线程之间的可见性,因此可以被多个线程同时读,从而不用加锁。而其get操作步骤也比较简单,定位Segment –> 定位HashEntry –> 通过getObjectVolatile()方法获取指定偏移量上的HashEntry –> 通过循环遍历链表获取对应值。

定位Segment:(((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase

定位HashEntry:(((tab.length - 1) & h)) << TSHIFT) + Tbase

public V get(Object key) {
    Segment s; // manually integrate access methods to reduce overhead
    HashEntry[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase;
    if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
7.ConcurrentHashMap的size方法

ConcurrentHashMap的size操作的实现方法也非常巧妙,一开始并不对Segment加锁,而是直接尝试将所有的Segment元素中的count相加,这样执行两次,然后将两次的结果对比,如果两次结果相等则直接返回;而如果两次结果不同,则再将所有Segment加锁,然后再执行统计得到对应的size值。

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

视频链接:HashMap和ConcurrentHashMap深入解析源码合集,看完吊打面试官!|图灵周瑜_哔哩哔哩_bilibili

原文链接:Unsafe类API介绍_相信明天的博客-CSDN博客

原文链接:Unsafe 类的API说明_Java&&大数据专栏-CSDN博客

原文链接:java Unsafe类数组操作_没事搞点事做serendipity的博客-CSDN博客 原文链接:Java并发包concurrent——ConcurrentHashMap_上善若水的木偶戏-CSDN博客

参考书籍:java并发编程的艺术

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/763808.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号