栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java信号量Semphore详解及源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java信号量Semphore详解及源码分析

什么是信号量

信号量(Semphore)用于控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量,以保证有限资源的合理使用。

Semphore实现思路

Semphore基于AQS队列同步器实现,他管理着一组虚拟许可(permit),许可通过Semphore的构造函数初始化。当程序执行指定操作前,需要先尝试获取许可,如果没有剩余许可,则当前线程进入阻塞状态,直到有许可位置。操作执行完毕后,将释放一个许可。当许可初始值设为1时,可以当作不可重入的互斥锁使用(通篇读过后不难理解)。

Semphore源码分析

此部分忽略部分无关代码,省略部分统一用"//…"表示,只梳理主要功能流程思路。

构造方法与静态内部类

Semphore提供了公平策略和不公平策略,二者差异体现在尝试获取许可时的策略不同,Semphore内中的类NonfairSync和FairSync就是用来实现两种不同策略的,首先关注一下Semphore的构造方法和静态内部类Sync、NonfairSync、FairSync,代码如下:

	private final Sync sync;
	//由此可见Semphore基于AQS实现
	abstract static class Sync extends AbstractQueuedSynchronizer {
		
        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }
		//以下方法省略 后续进行分析
        final int nonfairTryAcquireShared(int acquires) {
			//...
        }

        protected final boolean tryReleaseShared(int releases) {
            //...
        }

        final void reducePermits(int reductions) {
            //...
        }

        final int drainPermits() {
            //...
        }
    }
    
	static final class NonfairSync extends Sync {

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
	
	static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            //...
        }
    }
	
	 public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
	

内部类继承关系如下如所示:

除了代码内注释外,还需要关注一点,Sync的两个子类实现不同策略的关键实际在于对tryAcquireShared()方法的重写,公平策略内直接在该方法内写出实现逻辑,而非公平策略调用了父类Sync中的nonfairTryAcquireShared()方法,非公平策略的实现逻辑就在这个方法内体现。

这样做的好处体现在,无论选用哪种策略,使用者无需关心内部如何实现,只要直接调用acquire()(获取许可的方法,下文分析)方法就可以满足需求。

获取许可与释放许可的逻辑流程

接下来重点关注Semaphore中用于获取许可方法acquire()和用于释放许可方法release()
首先关注获取许可的相关方法

获取许可

首先关注获取许可时最为常用的方法acquire()及相关核心实现:

	
	public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
	//与无参acquire()无太大差异 只是制定了当前线程要获取的许可数量
	public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
	
	public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            
        if (tryAcquireShared(arg) < 0)
        	 
            doAcquireSharedInterruptibly(arg);
    }

	

非公平策略下的tryAcquireShared方法及后续流程如下:

	//以下两行为acquireSharedInterruptibly方法内的一部分 关注判断条件
	if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
	
	//此方法是NonfairSync的成员方法 负责调用父类方法nonfairTryAcquireShared()
	protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
	//此方法是Sync的成员方法
	final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
        	
            int available = getState();
            int remaining = available - acquires;
            
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

公平策略下的tryAcquireShared方法及后续流程如下:

	//此方法是FairSync的成员方法
	protected int tryAcquireShared(int acquires) {
            for (;;) {
            	
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

除了上述的两个acquire方法,Semphore还提供了三种其他的acquire方法,代码如下:

	//acquire()是一个无限定时间、可被中断且阻塞获取许可的方法	

	
	public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
	//有一个自定义获取许可数量的版本
	public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }
	
	public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
	//同样有一个自定义获取许可数量的版本
	public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
	
	public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }    
	//同样有一个自定义获取许可数量的版本
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
释放许可

接下来关注用于释放许可的方法release()及相关核心实现:

	//release()是Semphore的成员方法 默认释放一个许可
	public void release() {
        sync.releaseShared(1);
    }
	//释放指定许可数量版本
	public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
	//releaseShared(int arg)是AbstractQueuedSynchronizer的成员方法
	public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
        	
            doReleaseShared();
            return true;
        }
        return false;
    }

	protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current)
                throw new Error("Maximum permit count exceeded");
            
            if (compareAndSetState(current, next))
                return true;
        }
    }
其余方法

除主要流程外其他方法介绍:

	//返回当前剩余许可数
	public int availablePermits() {
        return sync.getPermits();
    }
	 
	 public int drainPermits() {
        return sync.drainPermits();
    }

	//减少许可数量
	protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
	
	//判断当前公平策略
	public boolean isFair() {
        return sync instanceof FairSync;
    }
	//判断当前Semaphore实例上是否存在正在等待获取许可的线程
	public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    //返回当前Semaphore实例上是正在等待许可的线程数量
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
	//返回当前Semaphore实例上是正在等待许可的线程
	protected Collection getQueuedThreads() {
        return sync.getQueuedThreads();
    }

以上便是Semphore全部源码分析内容

示例 1.使用Semphore将容器变为有界阻塞容器

注:示例源于《Java并发编程实战》程序清单5-14
代码简单易懂,仅添加部分注释,此处不做测试

public class BoundedHashSet {
    
    private final Set set;
    private final Semaphore sem;
	//此处的bound实际为容器容量上限
    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<>());
        sem = new Semaphore(bound);
    }
    
    public boolean add(T o) throws InterruptedException{
    	//向容器添加元素之前 必须先获得一个许可 
    	//同时此处采用acquire 如果没有剩余容量则会进入阻塞状态
        sem.acquire();
        boolean wasAdded = false;
        try{
            wasAdded = set.add(o);
            return wasAdded;
        }finally {
        	//保证添加失败时释放许可
            if(!wasAdded)
                sem.release();
        }
    }
    
    public boolean remove(Object o){
        boolean wasRemoved = set.remove(o);
        if(wasRemoved)
        	//移除元素成功则释放一个许可
            sem.release();
        return wasRemoved;
    }
}
2.使用Semphore模拟数据库连接池

下列代码思路来自https://blog.csdn.net/cy_Alone/article/details/70193658,这里做了部分无关痛痒的修改,代码如下:

public class ConnectPool {
    //连接id
    private static int connectId = 1;

    //连接池大小/Semaphore内许可数量
    private int size;
    private Vector connects;

    private boolean [] isConnectionUsed;
    private final Semaphore semaphore;

    static final class Connect{
        //连接唯一id
        private final int id = connectId++;
        public Connect() {
            try {
                //模拟连接启动耗时
                Thread.sleep(500);
            } catch (InterruptedException e) { }
            System.out.println("Connect" + id + "成功启动");
        }
    }

    public ConnectPool(int size) {
        this.size = size;
        semaphore = new Semaphore(size);
        connects = new Vector();
        isConnectionUsed = new boolean[size];
        initConnects();
    }

    //初始化数据库连接池
    private void initConnects(){
        for (int i = 0; i < size; i++) {
            connects.add(new Connect());
        }
    }

    //尝试获取数据库连接
    public Connect tryAcquireConnect() throws InterruptedException{
        semaphore.acquire();
        return acquireConnect();
    }
    private synchronized Connect acquireConnect(){
        for (int i = 0; i < size; i++) {
            if(!isConnectionUsed[i]){
                //标记该连接已被使用
                isConnectionUsed[i] = true;
                return (Connect) connects.get(i);
            }
        }
        return null;
    }

    //释放某个数据库连接
    public synchronized void releaseConnect(Connect connect){
        for (int i = 0; i < size; i++) {
            if(connect==connects.get(i)){
                isConnectionUsed[i] = false;
                semaphore.release();
            }
        }
    }
}

测试代码如下:

public static void main(String[] args) {
        //此处如果设置1 在其他场景就成了一种互斥锁 即同时只有一个线程可以访问
        final ConnectPool pool = new ConnectPool(2);

        for (int i = 0; i < 5; i++) {
            int id = i + 1;
            Thread thread = new Thread() {
                @Override
                public void run() {
                    try {
                        System.out.println("线程" + id + "等待获取数据库连接");
                        Connect connect = pool.tryAcquireConnect();
                        System.out.println("线程" + id + "已拿到数据库连接:" + connect);
                        //模拟数据库操作耗时
                        Thread.sleep(1000);
                        System.out.println("线程" + id + "释放数据库连接:" + connect);
                        pool.releaseConnect(connect);
                    } catch (InterruptedException e) { }
                }
            };
            thread.start();
        }
    }

测试结果如下

Connect1成功启动
Connect2成功启动
线程1等待获取数据库连接
线程3等待获取数据库连接
线程2等待获取数据库连接
线程1已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e
线程4等待获取数据库连接
线程3已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@10fccd40
线程5等待获取数据库连接
线程1释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e
线程2已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e
线程3释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@10fccd40
线程4已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@10fccd40
线程2释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e
线程5已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e
线程4释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@10fccd40
线程5释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e

以上便是本篇全部内容


作者才疏学浅,如文中出现纰漏,还望指正

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/362549.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号