栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

原子操作类LongAdder原理及实现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

原子操作类LongAdder原理及实现

文章目录

1. 简介2. 代码分析

1. 简介

背景:
AtomicLong 通过 CAS 提供了非阻塞的原子性操作,在高并发 下大量线程会同时去竞争更新 同 →个原子变量,但是由于同时只有一个线程的CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS 的操作,而这会白白浪费 CPU 资源。

LongAdder的思路:
把一个变量分解为多个变量 ,让同样多的线程去竞争多个资源

LongAdder维护了多个cell变量,每个 Cell 里面有 一个初始值为 0 的 long 型变量 ,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源 的并发量。另 外,多个线程在争夺 同 一个 Cell 原子变量时如果失败了 ,它并不是在当前 Ce ll 变量上一直自旋 CAS 重试 ,而是尝试在其他 Ce ll 的变量上进行 CAS 尝试 ,这个改变增加了当前线程重试 CAS 成功的可能性
获取 LongAdder 当 前值时, 是把所有 Cell 变量 的 value 值累加后再加上 base 返回的

public class LongAdderDemo {
	public static void main(String[] args) {
		LongAdder adder=new LongAdder();
		adder.add(10);
		adder.add(20);
		System.out.println(adder.intValue());
	}

}


2. 代码分析

public class LongAdder extends Striped64 implements Serializable: LongAdder类继承自Stripe64类,并且实现了 Serializable接口

//Stripe64类中的结构
// @sun.misc.Contended避免了伪共享
 @sun.misc.Contended static final class Cell {
        volatile long value;//volatile保证变量的内存可见性
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
 transient volatile Cell[] cells;
 transient volatile long base;

1. sum方法

//LongAdder类
public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

sum返回当前的值,加所有Cell 内部的value值后再累加 base,返回值不是很精准,因为在调用过程中可能有其他线程对cell数组中的值进行了更改

2. reset方法

//将base的值和cell数组的值置为0
 public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }

3. sumThenReset方法

//累加后再将base和cell数组值清零  sum+reset
 public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }

4. add方法

  public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casbase(b = base, b + x)) {//1
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||//2
                (a = as[getProbe() & m]) == null ||//3
                !(uncontended = a.cas(v = a.value, v + x)))//4
                longAccumulate(x, null, uncontended);//5
        }
    }
    //通过UNSAFE类的CAS设置成员变量base的值为base+x(要累加的值
  final boolean casbase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, base, cmp, val);
    }

主要来观察一下if判断语句中几个条件:

内层if语句:

    (as = cells) != null: cells数组不为null,说明存在争用;在不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组(as = cells) == null的话会执行!casbase(b = base, b + x): cells数组为null,表示之前不存在争用,如果此次casbase执行成功,表示基于base成员累加成功,add方法直接返回;如果casbase方法执行失败,说明产生了第一次争用冲突,需要对cells数组初始化,此时即将进入内层if块

外层if语句:

    CASE1: as==null||(m=as.length-1)<0: 表示cell数组没有被初始化CASE2: (a = as[getProbe() & m]) == null: 当前线程的hash值在cells数组映射位置的Cell对象为空,意思是还没有其他线程在同一个位置做过累加操作CASE3: !(uncontended = a.cas(v = a.value, v + x)): 当前线程的哈希值在cells数组映射位置的Cell对象不为空,然后在该Cell对象上进行CAS操作,设置其值为v+x(x为该Cell需要累加的值),CAS成功的话add方法执行结束,执行成功的话接下来会执行longAccumulate(x, null, uncontended)语句

3个CASE只要有1个满足就会执行longAccumulate方法

下面来看longAccumulate方法:

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;//collidae为true表示可以扩容  false表示不可以       
        for (;;) {//自旋操作  直到成功  
        //as:  cells的引用    a: 当前线程命中的cell
        //n: cells数组长度    v:期望值
            Cell[] as; Cell a; int n; long v;
      				//CASE1
            if ((as = cells) != null && (n = as.length) > 0) {//数组已经初始化了
            		//CASE1.1
                if ((a = as[(n - 1) & h]) == null) {//命中的cell位置为null-->创建一个Cell
                    if (cellsBusy == 0) {//没有处于创建、扩容阶段       
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //CASE1.2
                else if (!wasUncontended)// wasUncontended=false 当前线程CAS竞争失败
                    wasUncontended = true;      // Continue after rehash
                 //当前线程reshash过哈希值  CAS更新Cell
                 //CASE1.3
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //CASE1.4   
                else if (n >= NCPU || cells != as) //调整扩容意向 进入下一轮循环
                    collide = false;            // At max size or stale
                //CASE1.5
                else if (!collide)
                    collide = true;//设置扩容意向为true 但不一定发生扩容
                //CASE1.6
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //真正扩容的逻辑
            //CASE2
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //CASE3
            else if (casbase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

参考:
《Java高并发核心编程(卷II)》
《Java并发编程之美》

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/782196.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号