栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java并发 结合源码分析AQS原理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java并发 结合源码分析AQS原理

前言:

如果说J.U.C包下的核心是什么?那我想答案只有一个就是AQS。那么AQS是什么呢?接下来让我们一起揭开AQS的神秘面纱

AQS是什么?

AQS是AbstractQueuedSynchronizer的简称。为什么说它是核心呢?是因为它提供了一个基于FIFO的队列和state变量来构建锁和其他同步装置的基础框架。下面是其底层的数据结构。

AQS的特点

1、其内使用Node实现FIFO(FirstInFirstOut)队列。可用于构建锁或者其他同步装置的基础框架

2、且利用了一个int类表示状态。在AQS中维护了一个volatile int state,通常表示有线程访问资源的状态,当state>1的时候表示线程重入的数量,主要有三个方法控制:getState(),setState(),CompareAndSetState()。后面的源码分析多用到这几个方法

3、使用方法是继承,子类通过继承并通过实现它的方法管理其状态(acquire和release)的方法操纵状态。

4、同时实现排它锁和共享锁模式。实际上AQS功能主要分为两类:独占(只有一个线程能执行)和共享(多个线程同时执行),它的子类要么使用独占功能要么使用共享功能,而ReentrantLock是通过两个内部类来实现独占和共享

CountDownLatch如何借助AQS实现计数功能?

先来说一下CountDownLatch,CountDownLatch是一个同步辅助类,通过它可以来完成类似阻塞当前线程的功能,即一个或多个线程一起等待,直到其他线程执行的操作完成。要实现上面的功能,CountDownLatch是通过一个给定的原子操作的计数器来实现。调用该类的await()方法的线程会一直处于阻塞状态,直到其他线程调用countDown()方法使得计数器的值变为0之后线程才会执行,这个计数器是不能被重置的。通常这个类会用在程序执行需要等待某个条件完成的场景,比如说并行计算,可将一个数据量很大的计算拆分成一个个子任务,当子任务完成之后,再将最终的结果汇总。每次访问CountDownLatch只能有一个线程,但是这个线程在使用完countDown()方法之后能多个线程能继续运行,而调用await()方法的线程就一定要计数器为0才会运行

下面来分析CountDownLatch的源码以及如何使用AQS框架

public class CountDownLatch {
  
  private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    //初始化内部类实际上是设置AQS的state
    Sync(int count) {
      setState(count);
    }

    int getCount() {
      return getState();
    }
    //尝试获取共享是看当前的state是否为0
    protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
    }
    
    protected boolean tryReleaseShared(int releases) {
      
      for (;;) {
 int c = getState();
 if (c == 0)
   return false;
 int nextc = c-1;
 if (compareAndSetState(c, nextc))
   return nextc == 0;
      }
    }
  }

  private final Sync sync;

  
  public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
  }

  
  public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }

  
  public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.tonanos(timeout));
  }

  
  public void countDown() {
    sync.releaseShared(1);
  }

  
  public long getCount() {
    return sync.getCount();
  }

  
  public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
  }
}

由上面代码可看见CountDownLatch实现了AQS的共享锁,原理是操作state来实现计数,并且重写了tryAcquireShared(),tryReleaseShared()等方法

Semaphore是如何借助AQS实现控制并发访问线程个数?

Semaphore的功能类似于操作系统的信号量,可以很方便的控制某个资源同时被几个线程访问,即做并发访问控制,与CountDownLatch类似,同样是实现获取和释放两个方法。Semaphore的使用场景:常用于仅能提供访问的资源,比如数据库的连接数最大只有30,而应用程序的并发数可能远远大于30,这时候就可以使用Semaphore来控制同时访问的线程数。当Semaphore控制线程数到1的时候就和我们单线程一样了。同样Semaphore说是信号量的意思,我们这里就可以把它理解为十字路口的红绿灯,可以控制车流量(这里是控制线程数)

下面来分析Semaphore的源码以及如何使用AQS框

public class Semaphore implements java.io.Serializable {  private static final long serialVersionUID = -3222578661600680210L;  
  private final Sync sync;

  
  abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
      setState(permits);
    }

    final int getPermits() {
      return getState();
    }
    
    final int nonfairTryAcquireShared(int acquires) {
      for (;;) {
 int available = getState();
 int remaining = available - acquires;
 if (remaining < 0 ||
   compareAndSetState(available, remaining))
   return remaining;
      }
    }
      
    protected final boolean tryReleaseShared(int releases) {
      for (;;) {
 int current = getState();
 int next = current + releases;
 if (next < current) // overflow
   throw new Error("Maximum permit count exceeded");
 if (compareAndSetState(current, next))
   return true;
      }
    }

    final void reducePermits(int reductions) {
      for (;;) {
 int current = getState();
 int next = current - reductions;
 if (next > current) // underflow
   throw new Error("Permit count underflow");
 if (compareAndSetState(current, next))
   return;
      }
    }

    final int drainPermits() {
      for (;;) {
 int current = getState();
 if (current == 0 || compareAndSetState(current, 0))
   return current;
      }
    }
  }

  
  static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
      super(permits);
    }

    protected int tryAcquireShared(int acquires) {
      return nonfairTryAcquireShared(acquires);
    }
  }

  
  static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
      super(permits);
    }

    protected int tryAcquireShared(int acquires) {
      for (;;) {
 if (hasQueuedPredecessors())
   return -1;
 int available = getState();
 int remaining = available - acquires;
 if (remaining < 0 ||
   compareAndSetState(available, remaining))
   return remaining;
      }
    }
  }

  
  public Semaphore(int permits) {
    sync = new NonfairSync(permits);
  }

  
  public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }

  
  public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }

  
  public void acquireUninterruptibly() {
    sync.acquireShared(1);
  }

  
  public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
  }

  
  public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.tonanos(timeout));
  }

  
  public void release() {
    sync.releaseShared(1);
  }

  
  public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
  }

  public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
  }

  public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
  }

  public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.tonanos(timeout));
  }

  
  public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
  }

  
  public int availablePermits() {
    return sync.getPermits();
  }

  
  public int drainPermits() {
    return sync.drainPermits();
  }

  protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
  }

  public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
  }

  public final int getQueueLength() {
    return sync.getQueueLength();
  }

  protected Collection getQueuedThreads() {
    return sync.getQueuedThreads();
  }
  public String toString() {
    return super.toString() + "[Permits = " + sync.getPermits() + "]";
  }}

上面对于Semaphore的一些重要内部类和常用方法进行了解释,与CountDownLatch很类似,实现的都是共享的功能,即Semaphore允许得到许可证的线程同时执行,而CountDownLatch允许调用countDown()方法的线程同时执行。并且都是通过内部类实现的。相信看到这里,你能越来越看见AQS为什么被称作JUC包的核心。下面就来介绍一下ReentrantLock

ReentrantLock是如何借助AQS实现锁机制

ReentrantLock是可重入锁,前面博客中写到synchronized实现的锁也是可重入的。不过synchronized是基于JVM指令实现,而ReentrantLock是使用Java代码实现的。ReentrantLock重点就是需要我们手动声明加锁和释放锁,如果手工忘记释放锁,很有可能就会导致死锁,即资源永远都被锁住,其他线程无法得到,当前线程也释放不出去。ReentrantLock实现的是自旋锁,通过循环调用CAS操作实现加锁,避免了线程进入内核态的阻塞状态,所以性能较好。ReentrantLock内部同样实现了公平锁和非公平锁。事实上Synchronized能做的ReentrantLock都能做,但是反过来就不一样了、

经过前面的源码分析我们发现核心的都在当前类的内部类里,而当前类的一些方法不过是使用的内部类以及AQS的方法罢了,所以下面我们就来分析ReentrantLock中的三个内部类。

public class ReentrantLock implements Lock, java.io.Serializable {
  private static final long serialVersionUID = 7373984872572414699L;
  
  private final Sync sync;

  
  abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    
    abstract void lock();

    
    final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
 if (compareAndSetState(0, acquires)) {
   setExclusiveOwnerThread(current);
   return true;
 }
      }
      else if (current == getExclusiveOwnerThread()) {
 int nextc = c + acquires;
 if (nextc < 0) // overflow
   throw new Error("Maximum lock count exceeded");
 setState(nextc);
 return true;
      }
      return false;
    }
    
    protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
 throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
 free = true;
 setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
    }
    
    protected final boolean isHeldExclusively() {
      return getExclusiveOwnerThread() == Thread.currentThread();
    }
    
    final ConditionObject newCondition() {
      return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
      return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
      return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
      return getState() != 0;
    }

    
    private void readObject(java.io.ObjectInputStream s)
      throws java.io.IOException, ClassNotFoundException {
      s.defaultReadObject();
      setState(0); 
    }
  }

  
  static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    
    final void lock() {
      if (compareAndSetState(0, 1))
 setExclusiveOwnerThread(Thread.currentThread());
      else
 acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
    }
  }

  
  static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    
    final void lock() {
      acquire(1);
    }

    
    protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
 if (!hasQueuedPredecessors() &&
   compareAndSetState(0, acquires)) {
   setExclusiveOwnerThread(current);
   return true;
 }
      }
      else if (current == getExclusiveOwnerThread()) {
 int nextc = c + acquires;
 if (nextc < 0)
   throw new Error("Maximum lock count exceeded");
 setState(nextc);
 return true;
      }
      return false;
    }
  }
}

ReentrantLock和上面两个类最不同的莫过于ReentrantLock使用的是独占功能,即每次只能有一个线程来获取ReentrantLock类。ReentrantLock类下还有很多方法,这里就不一一介绍,但是本质都是内部类中的实现以及AQS的一些调用

总结:

AQS只是一个基础的框架,里面最核心的就是维护了state变量和CHL队列,而其他的类全部都是通过继承的方法进行扩展,虽然没有直接说源码,但是通过上面三个主要类的源码分析再去看AQS已经不是难事。继承主要改变的就是获取和释放的方法,通过这两个方法来对state和队列进行操作达到我们能够进行的并发控制的功能,事实上J.U.C包下的类和能够实现的功能远不止这三个,后面会选择重点的来介绍。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/136682.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号