栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Java多线程】JUC之深入队列同步器(AQS)(五)Semaphore(信号量)源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Java多线程】JUC之深入队列同步器(AQS)(五)Semaphore(信号量)源码解析

文章目录
  • 一.承接上文
  • 二.Semaphore是什么?
  • 三.Semaphore源码解析
  • 四.Semaphore使用

一.承接上文

【Java多线程】JUC之深入队列同步器(AQS)(一)实现细节解析

二.Semaphore是什么?

  • 位于【Java多线程】JUC之显示锁(Lock)与初识AQS(队列同步器)
三.Semaphore源码解析
public class Semaphore implements java.io.Serializable {
    //---------------成员变量start---------------
    
    private final Sync sync;
    //---------------成员变量end---------------
    
    //------------------------构造器start------------------------------
    
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    //------------------------构造器end------------------------------


    //------------------------对外开放>获取许可方法start------------------------------
    
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) {
            throw new IllegalArgumentException();
        }
        sync.acquireSharedInterruptibly(permits);
    }

    
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException();
        }
        sync.acquireShared(permits);
    }

    
    public boolean tryAcquire() {
        //返回许可数>=0获取许可成功
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    
    public boolean tryAcquire(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException();
        }
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
    
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) {
            throw new IllegalArgumentException();
        }
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
    
    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    //------------------------对外开放>获取许可方法end------------------------------


    //------------------------对外开放>释放并回收许可方法start------------------------------
    
    public void release() {
        sync.releaseShared(1);
    }
    
    public void release(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException();
        }
        sync.releaseShared(permits);
    }
    //------------------------对外开放>释放并回收许可方法end------------------------------


    //------------------------对外开放>强制回收许可方法-start-----------------------------
    
    public int drainPermits() {
        return sync.drainPermits();
    }
    
    protected void reducePermits(int reduction) {
        if (reduction < 0) {
            throw new IllegalArgumentException();
        }
        sync.reducePermits(reduction);
    }
    //------------------------对外开放>强制回收许可方法-end-----------------------------


    //------------------------内部类>非公平AQS同步器start------------------------------
    
    static final class NonfairSync extends Sync {
        
        NonfairSync(int permits) {
            super(permits);
        }

        
        @Override
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    //------------------------内部类>非公平AQS同步器end------------------------------


    //------------------------内部类>公平AQS同步器start------------------------------
    
    static final class FairSync extends Sync {
        
        FairSync(int permits) {
            super(permits);
        }

        
        @Override
        protected int tryAcquireShared(int acquires) {
            //自旋
            for (;;) {
                //hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,按照FIFO顺序获取锁
                //有则返回true表示线程需要排队,没有则返回false则表示线程无需排队。
                if (hasQueuedPredecessors()) {//和非公平锁区别在这里
                    return -1;//需要排队
                }
                //当前许可数
                int available = getState();
                //剩余许可数 = 当前许可数-获取数
                int remaining = available - acquires;
                //剩余许可数小于0(进入同步队列,并阻塞)) 或者 CAS更新当前许可数为剩余许可数成功,则退出返回剩余许可数,否则重试
                if (remaining < 0 ||
                        compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }
    }
    //------------------------内部类>公平AQS同步器end------------------------------


    //------------------------内部类>AQS同步器抽象实现start------------------------------
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);//设置同步状态
        }

        
        final int getPermits() {
            return getState();
        }

        
        final int nonfairTryAcquireShared(int acquires) {
            //自旋
            for (;;) {
                //当前许可数
                int available = getState();
                //剩余许可数(递减后值) = 当前许可数-获取数
                int remaining = available - acquires;
                //递减后值小于0(进入同步队列,并阻塞) 或者 大于0,且CAS更新当前许可数为剩余许可数成功,,则退出返回剩余许可数,否则重试
                if (remaining < 0 ||
                        compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }

        
        final void reducePermits(int reductions) {
            //自旋
            for (;;) {
                //获取当前许可数
                int current = getState();
                //递减后的值 = 当前许可数-减少许可数
                int next = current - reductions;
                //递减后的值大于当前许可数,抛出错误,结束程序运行
                if (next > current) {
                    throw new Error("Permit count underflow");
                }
                //反之,剩余许可数正常,CAS更新剩余许可数为最新许可数,并退出
                if (compareAndSetState(current, next)) {
                    return;
                }
            }
        }

        
        final int drainPermits() {
            //自旋
            for (;;) {
                //获取当前许可数
                int current = getState();
                //当前许可数为0 或者 不为0时CAS重置许可数为0成功,返回当前许可数(可能current并不为0),则退出,否则重试
                if (current == 0 || compareAndSetState(current, 0)) {
                    return current;
                }
            }
        }

        
        @Override
        protected boolean tryReleaseShared(int releases) {
            //自旋
            for (;;) {
                //获取当前许可数
                int current = getState();
                //最大许可数(累加后值)=当前许可数+释放许可数,超过最大计数
                int next = current + releases;

                //累加后值 小于 当前许可数+释放许可数 小于当前许可数,抛出错误,结束程序运行
                if (next < current) {// overflow
                    throw new Error("Maximum permit count exceeded");
                }
                //最大许可数 大于等于当前许可数,CAS更新许可数为最大许可数成功,则退出,否则重试
                if (compareAndSetState(current, next)) {
                    return true;
                }
            }
        }
    }

    //------------------------内部类>AQS同步器抽象实现end------------------------------


    //------------------------对外开放>获取相关状态方法start------------------------------
    
    public int availablePermits() {
        return sync.getPermits();
    }
    
    public boolean isFair() {
        return sync instanceof FairSync;
    }
    
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
    
    protected Collection getQueuedThreads() {
        return sync.getQueuedThreads();
    }
    @Override
    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
    //------------------------对外开放>获取相关状态方法end------------------------------
}

四.Semaphore使用

  • 位于【Java多线程】JUC之显示锁(Lock)与初识AQS(队列同步器)

添加微信,一起讨论Java、健身、养猫知识,哈哈哈
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/681050.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号