1. 简介2. 代码分析
1. 简介背景:
AtomicLong 通过 CAS 提供了非阻塞的原子性操作,在高并发 下大量线程会同时去竞争更新 同 →个原子变量,但是由于同时只有一个线程的CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS 的操作,而这会白白浪费 CPU 资源。
LongAdder的思路:
把一个变量分解为多个变量 ,让同样多的线程去竞争多个资源
LongAdder维护了多个cell变量,每个 Cell 里面有 一个初始值为 0 的 long 型变量 ,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源 的并发量。另 外,多个线程在争夺 同 一个 Cell 原子变量时如果失败了 ,它并不是在当前 Ce ll 变量上一直自旋 CAS 重试 ,而是尝试在其他 Ce ll 的变量上进行 CAS 尝试 ,这个改变增加了当前线程重试 CAS 成功的可能性
获取 LongAdder 当 前值时, 是把所有 Cell 变量 的 value 值累加后再加上 base 返回的
public class LongAdderDemo {
public static void main(String[] args) {
LongAdder adder=new LongAdder();
adder.add(10);
adder.add(20);
System.out.println(adder.intValue());
}
}
2. 代码分析
public class LongAdder extends Striped64 implements Serializable: LongAdder类继承自Stripe64类,并且实现了 Serializable接口
//Stripe64类中的结构
// @sun.misc.Contended避免了伪共享
@sun.misc.Contended static final class Cell {
volatile long value;//volatile保证变量的内存可见性
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
transient volatile Cell[] cells;
transient volatile long base;
1. sum方法
//LongAdder类
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
sum返回当前的值,加所有Cell 内部的value值后再累加 base,返回值不是很精准,因为在调用过程中可能有其他线程对cell数组中的值进行了更改
2. reset方法
//将base的值和cell数组的值置为0
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
3. sumThenReset方法
//累加后再将base和cell数组值清零 sum+reset
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
4. add方法
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casbase(b = base, b + x)) {//1
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||//2
(a = as[getProbe() & m]) == null ||//3
!(uncontended = a.cas(v = a.value, v + x)))//4
longAccumulate(x, null, uncontended);//5
}
}
//通过UNSAFE类的CAS设置成员变量base的值为base+x(要累加的值
final boolean casbase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, base, cmp, val);
}
主要来观察一下if判断语句中几个条件:
内层if语句:
- (as = cells) != null: cells数组不为null,说明存在争用;在不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组(as = cells) == null的话会执行!casbase(b = base, b + x): cells数组为null,表示之前不存在争用,如果此次casbase执行成功,表示基于base成员累加成功,add方法直接返回;如果casbase方法执行失败,说明产生了第一次争用冲突,需要对cells数组初始化,此时即将进入内层if块
外层if语句:
- CASE1: as==null||(m=as.length-1)<0: 表示cell数组没有被初始化CASE2: (a = as[getProbe() & m]) == null: 当前线程的hash值在cells数组映射位置的Cell对象为空,意思是还没有其他线程在同一个位置做过累加操作CASE3: !(uncontended = a.cas(v = a.value, v + x)): 当前线程的哈希值在cells数组映射位置的Cell对象不为空,然后在该Cell对象上进行CAS操作,设置其值为v+x(x为该Cell需要累加的值),CAS成功的话add方法执行结束,执行成功的话接下来会执行longAccumulate(x, null, uncontended)语句
3个CASE只要有1个满足就会执行longAccumulate方法
下面来看longAccumulate方法:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false;//collidae为true表示可以扩容 false表示不可以
for (;;) {//自旋操作 直到成功
//as: cells的引用 a: 当前线程命中的cell
//n: cells数组长度 v:期望值
Cell[] as; Cell a; int n; long v;
//CASE1
if ((as = cells) != null && (n = as.length) > 0) {//数组已经初始化了
//CASE1.1
if ((a = as[(n - 1) & h]) == null) {//命中的cell位置为null-->创建一个Cell
if (cellsBusy == 0) {//没有处于创建、扩容阶段
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//CASE1.2
else if (!wasUncontended)// wasUncontended=false 当前线程CAS竞争失败
wasUncontended = true; // Continue after rehash
//当前线程reshash过哈希值 CAS更新Cell
//CASE1.3
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//CASE1.4
else if (n >= NCPU || cells != as) //调整扩容意向 进入下一轮循环
collide = false; // At max size or stale
//CASE1.5
else if (!collide)
collide = true;//设置扩容意向为true 但不一定发生扩容
//CASE1.6
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//真正扩容的逻辑
//CASE2
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3
else if (casbase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
参考:
《Java高并发核心编程(卷II)》
《Java并发编程之美》



