信号量(Semphore)用于控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量,以保证有限资源的合理使用。
Semphore实现思路Semphore基于AQS队列同步器实现,他管理着一组虚拟许可(permit),许可通过Semphore的构造函数初始化。当程序执行指定操作前,需要先尝试获取许可,如果没有剩余许可,则当前线程进入阻塞状态,直到有许可位置。操作执行完毕后,将释放一个许可。当许可初始值设为1时,可以当作不可重入的互斥锁使用(通篇读过后不难理解)。
Semphore源码分析此部分忽略部分无关代码,省略部分统一用"//…"表示,只梳理主要功能流程思路。
构造方法与静态内部类Semphore提供了公平策略和不公平策略,二者差异体现在尝试获取许可时的策略不同,Semphore内中的类NonfairSync和FairSync就是用来实现两种不同策略的,首先关注一下Semphore的构造方法和静态内部类Sync、NonfairSync、FairSync,代码如下:
private final Sync sync;
//由此可见Semphore基于AQS实现
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
//以下方法省略 后续进行分析
final int nonfairTryAcquireShared(int acquires) {
//...
}
protected final boolean tryReleaseShared(int releases) {
//...
}
final void reducePermits(int reductions) {
//...
}
final int drainPermits() {
//...
}
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
//...
}
}
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
内部类继承关系如下如所示:
除了代码内注释外,还需要关注一点,Sync的两个子类实现不同策略的关键实际在于对tryAcquireShared()方法的重写,公平策略内直接在该方法内写出实现逻辑,而非公平策略调用了父类Sync中的nonfairTryAcquireShared()方法,非公平策略的实现逻辑就在这个方法内体现。
这样做的好处体现在,无论选用哪种策略,使用者无需关心内部如何实现,只要直接调用acquire()(获取许可的方法,下文分析)方法就可以满足需求。
获取许可与释放许可的逻辑流程接下来重点关注Semaphore中用于获取许可方法acquire()和用于释放许可方法release()
首先关注获取许可的相关方法
首先关注获取许可时最为常用的方法acquire()及相关核心实现:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//与无参acquire()无太大差异 只是制定了当前线程要获取的许可数量
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
非公平策略下的tryAcquireShared方法及后续流程如下:
//以下两行为acquireSharedInterruptibly方法内的一部分 关注判断条件
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
//此方法是NonfairSync的成员方法 负责调用父类方法nonfairTryAcquireShared()
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
//此方法是Sync的成员方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平策略下的tryAcquireShared方法及后续流程如下:
//此方法是FairSync的成员方法
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
除了上述的两个acquire方法,Semphore还提供了三种其他的acquire方法,代码如下:
//acquire()是一个无限定时间、可被中断且阻塞获取许可的方法
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//有一个自定义获取许可数量的版本
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
//同样有一个自定义获取许可数量的版本
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//同样有一个自定义获取许可数量的版本
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
释放许可
接下来关注用于释放许可的方法release()及相关核心实现:
//release()是Semphore的成员方法 默认释放一个许可
public void release() {
sync.releaseShared(1);
}
//释放指定许可数量版本
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
//releaseShared(int arg)是AbstractQueuedSynchronizer的成员方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
其余方法
除主要流程外其他方法介绍:
//返回当前剩余许可数
public int availablePermits() {
return sync.getPermits();
}
public int drainPermits() {
return sync.drainPermits();
}
//减少许可数量
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
//判断当前公平策略
public boolean isFair() {
return sync instanceof FairSync;
}
//判断当前Semaphore实例上是否存在正在等待获取许可的线程
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
//返回当前Semaphore实例上是正在等待许可的线程数量
public final int getQueueLength() {
return sync.getQueueLength();
}
//返回当前Semaphore实例上是正在等待许可的线程
protected Collection getQueuedThreads() {
return sync.getQueuedThreads();
}
以上便是Semphore全部源码分析内容
示例 1.使用Semphore将容器变为有界阻塞容器注:示例源于《Java并发编程实战》程序清单5-14
代码简单易懂,仅添加部分注释,此处不做测试
public class BoundedHashSet2.使用Semphore模拟数据库连接池{ private final Set set; private final Semaphore sem; //此处的bound实际为容器容量上限 public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<>()); sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException{ //向容器添加元素之前 必须先获得一个许可 //同时此处采用acquire 如果没有剩余容量则会进入阻塞状态 sem.acquire(); boolean wasAdded = false; try{ wasAdded = set.add(o); return wasAdded; }finally { //保证添加失败时释放许可 if(!wasAdded) sem.release(); } } public boolean remove(Object o){ boolean wasRemoved = set.remove(o); if(wasRemoved) //移除元素成功则释放一个许可 sem.release(); return wasRemoved; } }
下列代码思路来自https://blog.csdn.net/cy_Alone/article/details/70193658,这里做了部分无关痛痒的修改,代码如下:
public class ConnectPool {
//连接id
private static int connectId = 1;
//连接池大小/Semaphore内许可数量
private int size;
private Vector connects;
private boolean [] isConnectionUsed;
private final Semaphore semaphore;
static final class Connect{
//连接唯一id
private final int id = connectId++;
public Connect() {
try {
//模拟连接启动耗时
Thread.sleep(500);
} catch (InterruptedException e) { }
System.out.println("Connect" + id + "成功启动");
}
}
public ConnectPool(int size) {
this.size = size;
semaphore = new Semaphore(size);
connects = new Vector();
isConnectionUsed = new boolean[size];
initConnects();
}
//初始化数据库连接池
private void initConnects(){
for (int i = 0; i < size; i++) {
connects.add(new Connect());
}
}
//尝试获取数据库连接
public Connect tryAcquireConnect() throws InterruptedException{
semaphore.acquire();
return acquireConnect();
}
private synchronized Connect acquireConnect(){
for (int i = 0; i < size; i++) {
if(!isConnectionUsed[i]){
//标记该连接已被使用
isConnectionUsed[i] = true;
return (Connect) connects.get(i);
}
}
return null;
}
//释放某个数据库连接
public synchronized void releaseConnect(Connect connect){
for (int i = 0; i < size; i++) {
if(connect==connects.get(i)){
isConnectionUsed[i] = false;
semaphore.release();
}
}
}
}
测试代码如下:
public static void main(String[] args) {
//此处如果设置1 在其他场景就成了一种互斥锁 即同时只有一个线程可以访问
final ConnectPool pool = new ConnectPool(2);
for (int i = 0; i < 5; i++) {
int id = i + 1;
Thread thread = new Thread() {
@Override
public void run() {
try {
System.out.println("线程" + id + "等待获取数据库连接");
Connect connect = pool.tryAcquireConnect();
System.out.println("线程" + id + "已拿到数据库连接:" + connect);
//模拟数据库操作耗时
Thread.sleep(1000);
System.out.println("线程" + id + "释放数据库连接:" + connect);
pool.releaseConnect(connect);
} catch (InterruptedException e) { }
}
};
thread.start();
}
}
测试结果如下
Connect1成功启动 Connect2成功启动 线程1等待获取数据库连接 线程3等待获取数据库连接 线程2等待获取数据库连接 线程1已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e 线程4等待获取数据库连接 线程3已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@10fccd40 线程5等待获取数据库连接 线程1释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e 线程2已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e 线程3释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@10fccd40 线程4已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@10fccd40 线程2释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e 线程5已拿到数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e 线程4释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@10fccd40 线程5释放数据库连接:ConCurrent.ClassTest.Semaphore.ConnectPool$Connect@5ac7772e
以上便是本篇全部内容
作者才疏学浅,如文中出现纰漏,还望指正



