栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Java多线程】JUC之CAS机制与原子类型(Atomic)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Java多线程】JUC之CAS机制与原子类型(Atomic)

文章目录
  • 前言
  • 了解高并发必须知道的概念
  • 了解Java并发包Concurrent发展简述
  • 了解锁的概念
  • CAS是什么
  • 了解Unsafe类是什么
    • 1.JUC之魔法类(Unsafe)解析
    • 2.Unsafe实现CAS的核心 API
  • Atomic工具包
    • 1.常用原子类型
    • 2.使用案例
      • 2.1.测试原子累加和非原子累加
      • 2.2.使用Unsafe属性进行原子操作
  • AtomicInteger源码浅析
  • CAS缺陷
    • 1.CAS缺点有哪些?
    • 2.ABA问题的解决方案之AtomicStampedReference
  • 拓展
    • 1.CAS实现单例模式
    • 2.使用Unsafe实现一个简单原子类型

前言

并发编程最佳学习路线
【Java多线程】了解线程的锁池和等待池概念
【Java基础】多线程从入门到掌握
【Java多线程】线程通信


了解高并发必须知道的概念

【Java多线程】高并发修炼基础之高并发必须了解的概念

了解Java并发包Concurrent发展简述

【Java多线程】JUC之Java并发包Concurrent发展简述(各版本JDK中的并发技术)

了解锁的概念

【Java多线程】成神之路中必须要了解的锁分类

CAS是什么
  • 我们平时用(synchronized,Lock)都属于悲观锁。它总是人为每次修改数据之前都可能被其他人(线程)修改,所以在访问资源的时候就会对资源进行加锁。当线程获取到锁后,其他需要锁获取锁的线程就要阻塞,等待持有锁的线程释放锁。

  • CAS机制属于乐观锁,乐观锁的核心思路就是每次不加锁而是假设修改数据之前其他线程一定不会修改,如果因为修改过产生冲突就失败就重试,直到成功为止。

CAS的全称是Compare And Swap,翻译过来就是比较并交换。是一种无锁算法,是Java提供的非阻塞原子性操作。在不使用锁的情况下实现多线程下的同步。在并发包中(java.util.concurrent.atomic)原子类型都是使用CAS来实现乐观锁的。

  • 特点: CAS算法是非阻塞的,当有多个线程对内存中的数据进行CAS操作时,CPU能保证只会有一个线程能更新成功,其余线程并不会阻塞,而是继续尝试获取更新,当然也可以主动放弃。因此不可能出现死锁的情况。也就是说无锁操作天生免疫死锁。

  • 原理:在Java中可以通过Unsafe类实现CAS操作,而Unsafe类最终调用的是native方法,即在Java中通过JNI调用C或者C+系统函数,来调用CPU提供的cmpxchgl指令实现的原子性的。

CAS算法的过程是这样:它包含3个参数 CAS(V、E、N)

  • 基本思想:将内存位置的值V与预期值E比较,如果V=E,那么处理器会自动将V更新为A。否则,处理器不做任何操作。
    • V:要更新的变量
    • E:预期值
    • N:新值
      .

CAS操作可以分为3个步骤:
1)将预期值E与内存中的值V比较;
2)如果V值不等于E值,说明其他线程做了更新,那么什么也不做,如果E与V的值相等,那么就将V的值设置为N(保证了新值总是基于最新的信息计算的)
3)返回操作是否成功。

  • 什么是预期值?:也就是你认为现在变量应该是什么样子,如果变量不是你想象的那样,那说明已经被别人修改过。你就重新读取,再次尝试修改即可。
  • 简单的来说CAS就是在一个死循环中判断预期的值E和内存中的值V是否相等,相等的话就将V修改为N成功后退出循环,如果不相等的话就继续循环直到E等于V并且更新V=N成功退出循环
  • 注意:CAS有循环开销大的问题,因为会一直循环到预期值E和内存值V相等修改成功。同时CAS只能保证一个共享变量的原子性的问题。不过在JDK1.5之后加入了“AtomicReference类“来保证引用对象之间的原子性。

了解Unsafe类是什么 1.JUC之魔法类(Unsafe)解析

看我的这篇文章【Java多线程】JUC之魔法类(Unsafe)解析就行了,很适合小白入门

2.Unsafe实现CAS的核心 API
1.compareAndSwapObject
2.compareAndSwapInt
3.compareAndSwapInt

原子变量提供的原子性来自CAS操作,CAS来自Unsafe提供的api,然后由CPU的cmpxchg 指令来保证。

  • (cmpxchg是汇编指令,作用:比较并交换操作数)
Atomic工具包

Atomic工具包是JDK1.5出现的并发工具,主要用于保证变量的原子性和可见性。

1.常用原子类型

普通原子类型:提供对boolean、int、long和对象的原子性操作。

AtomicBoolean
AtomicInteger
AtomicLong
AtomicReference

原子类型数组:提供对数组元素的原子性操作。

AtomicLongArray
AtomicIntegerArray
AtomicReferenceArray

原子类型字段更新器:提供对指定对象的指定字段进行原子性操作。

AtomicLongFieldUpdater
AtomicIntegerFieldUpdater
AtomicReferenceFieldUpdater

带版本号的原子引用类型:以版本戳的方式解决原子类型的ABA问题。

AtomicStampedReference
AtomicMarkableReference

原子累加器(JDK1.8):AtomicLong和AtomicDouble的升级类型,专门用于数据统计,性能更高。

DoubleAccumulator
DoubleAdder
LongAccumulator
LongAdder
2.使用案例 2.1.测试原子累加和非原子累加
public class AtomicIntegerTest {
    public static AtomicInteger total = new AtomicInteger();
    public static Integer count = 0;
    private static Integer threadNum = 1000;
    public static CountDownLatch downLatch = new CountDownLatch(threadNum);

    public static void main(String[] args) throws InterruptedException {
        Long startTime = System.currentTimeMillis();
         // 创建1000个线程,每个线程中的run()方法将count累加1000次
        for (int i = 0; i < threadNum; i++) {
            new Thread(() -> {
              // 每个线程累加1000次
                for (int j = 0; j < threadNum; j++) {
                	//原子自增
                    total.getAndIncrement();
                    //非原子自增
                    count++;
                }
                //加入等待
                downLatch.countDown();
            }).start();
        }

        //等待所有线程完成
        downLatch.await();
        System.out.println("非原子:"+count);
        System.out.println("原子:"+total.get());
    }
}

执行结果:

  1. 创建 1000 个线程分别对total进行累加,由于没有对 total 做任何线程安全操作所以结果一定是小于或者等于 1000000

  2. 通过 Integer 的原子类,AtomicInteger 类来进行自增操作,然后调用getAndIncrement(),保证每次都是原子操作。最终的结果输出:1000000。

2.2.使用Unsafe属性进行原子操作
  1. 获取属性偏移量
  2. 通过 CAS 方式进行修改
public class UnsafeTest {
    public static Unsafe U;

    static {
        try {
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            U = (Unsafe)f.get(null);
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
        User user = new User();
        // 获取字段
        Field age = user.getClass().getDeclaredField("age");
        // 获取字段相对Java对象的"起始地址"的偏移量
        long offset = U.objectFieldOffset(age);
        // cas设置值,如果age的内存值等于10,就将age的内存值修改为20,返回true,否则返回false
        boolean success = U.compareAndSwapInt(user, offset, 10, 20) ;
        System.out.println("修改结果: " + success ) ;
        // 打印数据
        System.out.println("查询结果: " + user.getAge());
    }
}

class User {
    private int age;

    public User() {
        this.age = 10;
    }
    
    public int getAge() { return age; }
}

执行结果

AtomicInteger源码浅析
public class AtomicInteger extends Number implements java.io.Serializable {    
    private static final long serialVersionUID = 6214790243416807050L;     
    // 获取指针类Unsafe    
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    //变量value在AtomicInteger实例对象内的内存偏移量    
    private static final long valueOffset;
    static {
          try {           
              //通过unsafe类的objectFieldOffset()方法,获取value变量在对象内存中的偏移量           
              //通过该偏移量valueOffset,使用unsafe类的内部方法可以直接对内存中value进行取值或赋值操作            
              valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));        
          } catch (Exception ex) { throw new Error(ex); }    
    }   
    //当前AtomicInteger封装的int变量value,该属性通过volatile保证其在线程间是可见性。
    private volatile int value;
    //构造方法   
    public AtomicInteger(int initialValue) {        
        value = initialValue;    
    }    
    public AtomicInteger() {   
    }   
    //获取当前最新值    
    public final int get() {        
        return value;    
    }    
    //设置当前值,具备volatile效果,方法用final修饰是为了更进一步的保证线程安全。    
    public final void set(int newValue) {        
        value = newValue;    
    }    
    //最终会设置成newValue,使用该方法后可能导致其他线程在之后的一小段时间内可以获取到旧值,有点类似于延迟加载
    public final void lazySet(int newValue) {        
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }
    //设置新值并获取旧值,底层调用的是CAS操作即unsafe.compareAndSwapInt()方法
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }   
    //如果当前值为预期值expect,则设置为update(当前值指的是value变量)    
    public final boolean compareAndSet(int expect, int update) { 
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);    
    }    
    //当前值加1返回旧值,底层CAS操作    
    public final int getAndIncrement() { 
        return unsafe.getAndAddInt(this, valueOffset, 1);   
    }    
    //当前值减1,返回旧值,底层CAS操作    
    public final int getAndDecrement() {        
        return unsafe.getAndAddInt(this, valueOffset, -1);    
    }   
    //当前值增加delta,返回旧值,底层CAS操作    
    public final int getAndAdd(int delta) {        
          return unsafe.getAndAddInt(this, valueOffset, delta);    
    }    
    //当前值加1,返回新值,底层CAS操作
    public final int incrementAndGet() {        
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;    
    }    
    //当前值减1,返回新值,底层CAS操作    
    public final int decrementAndGet() {        
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;    
    }   
    //当前值增加delta,返回新值,底层CAS操作    
    public final int addAndGet(int delta) {        
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;    
    }   
    //省略一些不常用的方法....
 }

AtomicInteger是基于Unsafe类中CAS相关操作实现的,是无锁操作。

public final int getAndIncrement() { 
    return unsafe.getAndAddInt(this, valueOffset, 1);   
}

调用了Unsafe类中的getAndAddInt()方法,该方法执行一个CAS操作,保证变量的原子性。

  • 可看出getAndAddInt通过一个do-while循环不断的重试更新要设置的值,直到成功为止,调用的是Unsafe类中的compareAndSwapInt方法,是一个CAS操作方法。
    • 即: 如果var1+var2在内存的值等于var5 ,修改var1+var2在内存的值为var5 + var4(也就是内存值+=1),返回true,如果不相等则返回false,继续循环尝试更新
  • 如果将compareAndSwapInt(var1, var2, var5, var5 + var4)换成compareAndSwapInt(obj, offset, expect, update)就比较清楚了,意思就是如果obj内的value和expect相等,就证明没有其他线程改变过这个变量,那么就更新它为update,如果这一步的CAS没有成功,那就采用自旋的方式继续进行CAS操作,取出乍一看这也是2个步骤了啊,其实是在JNI里调用C的系统函数发送CPU指令cmpxchgl完成的。所以还是原子操作。
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

上述源码分析是基于JDK1.8的,如果是1.8之前的方法实现如下`:

//JDK 1.7的源码,由for的死循环实现,并且直接在AtomicInteger实现该方法,
//JDK1.8后,该方法实现已移动到Unsafe类中,直接调用getAndAddInt方法即可
public final int incrementAndGet() {    
    for (;;) {        
        int current = get();        
        int next = current + 1;        
        if (compareAndSet(current, next))            
            return next;    
    }
}
CAS缺陷 1.CAS缺点有哪些?
  1. 循环时间长时开销大:如果CAS不成功,则会原地自旋,因为会一直循环到预期值E和内存值V相等修改成功,如果长时间自旋会给CPU带来非常大的执行开销。果长时间自旋会给CPU带来非常大的执行开销。

  2. 只能保证一个共享变量的原子操作:当对1个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有2个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。在JDK1.5之后加入了“AtomicReference类“来保证引用对象之间的原子性。可以把多个变量放在一个对象里来进行CAS操作。

  3. ABA问题
    如果内存地址V初次读取的值是A,并且在准备赋值的时候检查到它的值仍然为A,那我们就能说它的值没有被其他线程改变过了吗?

    • 如果在这段期间它的值曾经被改成了B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过。无法正确判断这个变量是否已被修改过,一般称这种情况为ABA问题。

      即线程1在把A的值改成了B,线程2又把B改回了A,此时再来一个线程3操作时发现A的值并没有改变,但实际上A的值是有被操作过的,为了避免在这种情况下CAS锁失效.

    • . ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A-B-A就会变成1A-2B-3A。在JDK1.5之后加入了“AtomicReference类“来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。 解决了多线程反复读写时,无法预知值是否已被修改的问题。

因此,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。

2.ABA问题的解决方案之AtomicStampedReference

在JDK1.5后.Atomic包加入了 AtomicStampedReference类通过给每个变量加入了一个版本号来避免ABA问题。每次修改变量值的操作,都会比对当前值和期望值,当前版本号和预期版本号,只有二者都相同是才能更新成功

  • 底层实现:由一个自定义键值对Pair存储对象引用reference和版本号stamp,并构造volatile修饰的私有实例;更新数据时,不但会对比当前值和期望值,还会对比当前版本号和预期版本号,只有二者都相同,才会调用Unsafe的compareAndSwapObject方法执行数值和版本号替换。

  • 它的构造方法有 2 个参数(初始值,初始版本号),在修改数据时需提供一个新的值和一个新的版本号,这样在多线程情况下只要数据被修改了那么版本号一定会发生改变,另一个线程拿到的是旧的版本号所以会修改失败。

    tips: AtomicMarkableReference:带boolean值标识的原子引用类型,true和false两种切换状态表示是否被修改。不靠谱。

在此之前我们先模拟一个 ABA 问题:

public class ABATest {
	//初始原子类型的值为10
    private static AtomicInteger index = new AtomicInteger(10);

    public static void main(String[] args) {
        new Thread(() -> {
        	//如果内存值为10,则修改为101
            index.compareAndSet(10, 101);
            //如果内存值为101,则修改为10
            index.compareAndSet(101, 10);
            System.out.println(Thread.currentThread().getName() + ": 10->101->10");
        }, "zhangSan").start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //如果内存值为10,则修改为1000,并返回修改结果
            boolean result = index.compareAndSet(10, 1000);
            System.out.println(Thread.currentThread().getName() + ": 更新结果:" + result + ", 更新后的值: " + index.get());
        }, "liSi").start();
    }
}

执行结果

通过 AtomicStampedReference来解决这个问题: 就是我们在进入第2个线程之前,先读取当前的版本号,然后进入更新。这个读取的版本号可能是一个旧的值。如果出现这种情况,那么我们执行compareAndSet就会返回失败。

public class ABATest2 {
	//初始原子类型的值为10,版本号为1
    private static AtomicStampedReference index = new AtomicStampedReference(10, 1);

    public static void main(String[] args) {
        new Thread(() -> {
        	//如果内存值为10,且内存版本号相等,则修改为101,且版本号+1
            index.compareAndSet(10, 101, index.getStamp(), index.getStamp() + 1);
            //如果内存值为101,,且内存版本号相等,则修改为10,且版本号+1
            index.compareAndSet(101, 10, index.getStamp(), index.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + ": 10->101->10");
        }, "zhangSan").start();

		//获取当前版本号
        int stamp = index.getStamp();
        
        new Thread(() -> {、
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //如果内存值为10,且内存版本号相等,则修改为1000,且版本号+1,返回true,否则什么也不做,返回false
            boolean result = index.compareAndSet(10, 1000, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName() + ": 更新结果:" + result + ", 更新后的值: " + index.getReference());
        }, "liSi").start();
    }
}

执行结果

拓展 1.CAS实现单例模式
public class Singleton {
    private static AtomicReference singletonAtomicReference = new AtomicReference<>();

    private Singleton() {
    }

    public static Singleton getInstance() {
        while (true) {
            // 获得singleton
            Singleton singleton = singletonAtomicReference.get();

            // 如果singleton不为空,就返回singleton
            if (singleton != null) {
                return singleton;
            }

            // 如果singleton为空,创建一个singleton
            singleton = new Singleton();

            // CAS操作,预期值是NULL,新值是singleton
            // 如果成功,返回singleton
            // 如果失败,进入第二次循环,singletonAtomicReference.get()就不会为空了(当前值等于null,更新当前值为singleton)
            if (singletonAtomicReference.compareAndSet(null, singleton)) {
                return singleton;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int threadNum = 1000;
        CountDownLatch downLatch = new CountDownLatch(threadNum);

        for (int i = 0; i < threadNum; i++) {
            new Thread(() -> {
                for (int j = 0; j < threadNum; j++) {
                    System.out.println(Singleton.getInstance());
                }
                //加入等待
                downLatch.countDown();
            }).start();
        }

        //等待所有线程完成
        downLatch.await();
        System.out.println("执行结束");
    }
}
2.使用Unsafe实现一个简单原子类型

测试原子和非原子操作

public class MyAtomicInteger {

    private static long offset;//偏移地址
    private static Unsafe unsafe;

    static {
        try {
            Field theUnsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafeField.setAccessible(true);
            unsafe = (Unsafe) theUnsafeField.get(null);
            Field field = MyAtomicInteger.class.getDeclaredField("value");
            offset = unsafe.objectFieldOffset(field);//获得偏移地址
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private volatile int value;


    public void increment() {
        int tempValue;
        do {
            tempValue = unsafe.getIntVolatile(this, offset);//拿到值
        } while (!unsafe.compareAndSwapInt(this, offset, tempValue, value + 1));//CAS自旋
    }

    public int get() {
        return value;
    }


    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, offset, 1) + 1;
    }

    
    public final int getAndAddInt(Object obj, long offset, int expect) {
        int tempValue;
        do {
            tempValue = unsafe.getIntVolatile(obj, offset);
            //实例,示例属性的偏移量,预期值,更新的值
            //如果偏移量的值等于预期值值,更新offset=tempValue + expect,返回true,否则不更新offset,返回false
        } while (!unsafe.compareAndSwapInt(obj, offset, tempValue, tempValue + expect));//CAS自旋

        return tempValue;
    }
}
public class MyAtomicIntegerTest {
    public static MyAtomicInteger total = new MyAtomicInteger();
    public static Integer count = 0;
    private static Integer threadNum = 1000;
    public static CountDownLatch downLatch = new CountDownLatch(threadNum);

    public static void main(String[] args) throws InterruptedException {
        Long startTime = System.currentTimeMillis();
        for (int i = 0; i < threadNum; i++) {
            new Thread(() -> {
                for (int j = 0; j < threadNum; j++) {
                    total.incrementAndGet();
                    count++;
                }
                //加入等待
                downLatch.countDown();
            }).start();
        }

        //等待所有线程完成
        downLatch.await();
        System.out.println("非原子:"+count);
        System.out.println("原子:"+total.get());
    }
}

执行结果


  • CAS、原子操作类的应用与浅析及Java8对其的优化
  • Java CAS 底层原理剖析
  • 初识CAS的实现原理
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584279.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号