栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

1.8 的ConcurrentHashMap要得不

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

1.8 的ConcurrentHashMap要得不

Unsafe

JAVA中神奇的双刃剑--Unsafe - throwable - 博客园

使用Unsafe要注意以下几个问题:

  • 1、Unsafe有可能在未来的Jdk版本移除或者不允许Java应用代码使用,这一点可能导致使用了Unsafe的应用无法运行在高版本的Jdk。
  • 2、Unsafe的不少方法中必须提供原始地址(内存地址)和被替换对象的地址,偏移量要自己计算,一旦出现问题就是JVM崩溃级别的异常,会导致整个JVM实例崩溃,表现为应用程序直接crash掉。
  • 3、Unsafe提供的直接内存访问的方法中使用的内存不受JVM管理(无法被GC),需要手动管理,一旦出现疏忽很有可能成为内存泄漏的源头。
JAVA 1.8
构造方法

在1.8中它的5个构造方法

// 空构造 默认长度16
ConcurrentHashMap defultChm = new ConcurrentHashMap<>();
// hashMap 32; 1.7 ConcurrentHashMap 32;  1.8 ConcurrentHashMap 64;
ConcurrentHashMap initialChm = new ConcurrentHashMap<>(32);
// putAll传递的Map 默认长度16
ConcurrentHashMap mapToChm = new ConcurrentHashMap<>(new HashMap<>());
// 设置初始长度和负载因子
ConcurrentHashMap initLoadChm = new ConcurrentHashMap<>(32,0.75f);
// 设置初始长度、负载因子、正在扩容的线程数
ConcurrentHashMap allChm = new ConcurrentHashMap<>(32,0.75f,1);
 
    public ConcurrentHashMap() {
    }

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

    public ConcurrentHashMap(Map m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

而在初始化时除了空参构造外其他构造方法都使用了一个非常重要的参数sizeCtl

    
    private transient volatile int sizeCtl;

    
    private transient volatile int sizeCtl;

sizeCtl有控制表初始化和调整大小的含义。 
     * sizeCtl为0,代表未初始化,且数组的初始容量为16。
     * sizeCtl为正数,如果数组未初始化记录的是初始容量,如果数组已经初始化代表的扩容阈值
     * sizeCtl为-1,表示正在初始化
     * sizeCtl小于-1,表示数组正在扩容,-(1+n)表示此时有n个线程正在共同完成数组的扩容

PUT方法

ConcurrentHashMap的初始容量并不是在初始化的时候创建的,而是在第一次put的时候实现的

// 如果tab对象为空进行初始化
if (tab == null || (n = tab.length) == 0)
    tab = initTable();

初始化后在put的时候如果元素节点为空,创建一个新元素赋值

    // 如果tab对象为空进行初始化
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
    // 如果获取的节点为空
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        // cas校验,如果节点为空创建一个新元素赋值
        if (casTabAt(tab, i, null,
            new Node(hash, key, value, null)))
            break;                   // no lock when adding to empty bin 添加到空empty时没有锁定
    }

如果节点不为空并且hash为扩容hash进行协助扩容不进行节点新增

    // 如果tab对象为空进行初始化
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
    // 如果获取的节点为空
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        // cas校验,如果节点为空创建一个新元素赋值
        if (casTabAt(tab, i, null,
            new Node(hash, key, value, null)))
            break;                   // no lock when adding to empty bin 添加到空empty时没有锁定
    }    
    // 多线程协助扩容,如果当前节点正在扩容
    else if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);

else进行数据新增,这时使用了synchronized锁来保证线程安全,sync锁的只有单个节点对象,不锁整个数组,其他节点可以同步操作,这样效率大大增加

    synchronized (f) {
        // 防止有元素做完操作后将tab变为了树,元素发生了变化,有可能不是原来的对象
        if (tabAt(tab, i) == f) {
            // 如果hash值大于0则代表是链表结构
            if (fh >= 0) {
                binCount = 1;
                for (Node e = f;; ++binCount) {
                    K ek;
                    // 如果hash值一样,并且key相同
                    if (e.hash == hash &&
                            ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                        // 获取旧元素的val
                        oldVal = e.val;
                        // 是否替换值,false是替换,true是不替换
                        if (!onlyIfAbsent)
                            e.val = value;
                        break;
                    }
                    Node pred = e; // 存放当前引用,下个next为空时以便关联节点
                    if ((e = e.next) == null) {
                        // 新增节点到链表末尾
                        pred.next = new Node(hash, key,
                                value, null);
                        break;
                    }
                }
            }
            // 如果是树结构
            else if (f instanceof TreeBin) {
                Node p;
                binCount = 2;
                if ((p = ((TreeBin)f).putTreeval(hash, key,
                        value)) != null) {
                    oldVal = p.val;
                    if (!onlyIfAbsent)
                        p.val = value;
                }
            }
        }
    }

在新增完元素后需要判断链表长度判断是否转为树

    if (binCount != 0) {
        // 如果链表长度大于等于8时不一定转树
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }

最后一步增加集合长度和协助扩容

    // 集合长度增加
    addCount(1L, binCount);

在线程对ConcurrentHashMap进行元素新增后要对元素个数进行维护

addCount方法分为两部分:

1. 维护集合长度
如果counterCells数组为空,再通过cas的方式修改baseCOUNT,如果修改成功则不需要对counterCells数组进行维护

2. 协助扩容

如果新增后的元素个数大于等于扩容阈值,并且table长度小于1073741824时进行协助扩容逻辑,然后判断当前扩容动作是否完成,当前线程有没有达到最大线程,如果符合直接break不参与扩容,否则通过cas的方式增加SIZECTL的值来同步增加的线程扩容数量,然后进行协助扩容操作

    
    private final void addCount(long x, int check) {
        
        CounterCell[] as; long b, s;
        // counterCells是内部长度数组,默认是空
        // 如果当前baseCount与内存baseCOUNT相等对baseCount+x
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, baseCOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 判断内部长度数组是否为空
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandomLab.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 如果没有对baseCOUNT增加成功将会增加到counterCells内部长度数组中
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount(); // 获取baseCount和counterCells数组value值累加的总和
        }
        
        if (check >= 0) { // check在移除和替换时大于0
            Node[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n); // 获取标记
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0) // 判断当前扩容动作有没有完成、当前线程有没有达到最大线程
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 协助扩容,并增加sc线程数量
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2)) // (rs << RESIZE_STAMP_SHIFT)左移16位,SIZECTL变为负数,状态变更为扩容中
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

协助扩容:transfer方法

第一步:计算分配任务量

如果当前cpu核数是单个直接分配给全量长度,单线程不需要分配,如果核数>1则得到一个(1/4n) / 核数的值,如果得到的值小于16直接赋予16,最小执行任务量为16

第二步:如果新数组为空创建新数组

创建一个之前两倍大小的新Node数组

第三步:计算当前现场负责部分

同样使用cas进行更新要负责迁移数组的大小,例如之前数组64-16=48,更新为48-16=32

第四步:扩容是否完成逻辑

如果扩容完成finishing状态为True,nextTable置空,table引用nextTable,sizeCtl更新为下一次扩容阈值

如果扩容没有完成返回继续协助扩容

第五步:如果当前位置没有元素不进行迁移,在此位置增加fwd元素标识

第六步:如果当前节点已经是fwd节点不需再迁移

第七步:实际迁移动作,对当前对象进行synchronized加锁操作,防止被新增

    
    private final void transfer(Node[] tab, Node[] nextTab) {
        int n = tab.length, stride;
        
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 判断cpu大于1的时候进行多线程协助
            stride = MIN_TRANSFER_STRIDE; // subdivide range ( 细分范围 ) 最小的单元为16
        
        if (nextTab == null) {            // initiating ( 开始 )
            try {
                @SuppressWarnings("unchecked")
                Node[] nt = (Node[])new Node[n << 1]; // 新建一个2倍容量的数组
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab; // nextTable记录新的数组
            transferIndex = n;  // 使用transferIndex记录当前旧数组的size
        }
        int nextn = nextTab.length;
        ForwardingNode fwd = new ForwardingNode(nextTab); // 定义为fwd节点代表当前位置正在被迁移,并且hash也是MOVED
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node f; int fh;
            while (advance) {
                
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ? // 如果下一次的节点大于16进行分割
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                
                if (finishing) {
                    nextTable = null;
                    table = nextTab; // table引用
                    sizeCtl = (n << 1) - (n >>> 1); // 设置sizeCtl (2n) - (1/2n) = 1.5n = 0.75 * 2n
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null) // 如果当前位置没有元素无需迁移,增加fwd元素标识
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED) // 如果已经迁移过则不需再迁移
                advance = true; // already processed
            else {
                
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node lastRun = f;
                            for (Node p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node(ph, pk, pv, ln);
                                else
                                    hn = new Node(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin t = (TreeBin)f;
                            TreeNode lo = null, loTail = null;
                            TreeNode hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode p = new TreeNode
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/678267.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号