- 一.承接上文
- 二.Semaphore是什么?
- 三.Semaphore源码解析
- 四.Semaphore使用
【Java多线程】JUC之深入队列同步器(AQS)(一)实现细节解析
二.Semaphore是什么?三.Semaphore源码解析
- 位于【Java多线程】JUC之显示锁(Lock)与初识AQS(队列同步器)
public class Semaphore implements java.io.Serializable {
//---------------成员变量start---------------
private final Sync sync;
//---------------成员变量end---------------
//------------------------构造器start------------------------------
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//------------------------构造器end------------------------------
//------------------------对外开放>获取许可方法start------------------------------
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) {
throw new IllegalArgumentException();
}
sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
}
sync.acquireShared(permits);
}
public boolean tryAcquire() {
//返回许可数>=0获取许可成功
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
}
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) {
throw new IllegalArgumentException();
}
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//------------------------对外开放>获取许可方法end------------------------------
//------------------------对外开放>释放并回收许可方法start------------------------------
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
}
sync.releaseShared(permits);
}
//------------------------对外开放>释放并回收许可方法end------------------------------
//------------------------对外开放>强制回收许可方法-start-----------------------------
public int drainPermits() {
return sync.drainPermits();
}
protected void reducePermits(int reduction) {
if (reduction < 0) {
throw new IllegalArgumentException();
}
sync.reducePermits(reduction);
}
//------------------------对外开放>强制回收许可方法-end-----------------------------
//------------------------内部类>非公平AQS同步器start------------------------------
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
@Override
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//------------------------内部类>非公平AQS同步器end------------------------------
//------------------------内部类>公平AQS同步器start------------------------------
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
@Override
protected int tryAcquireShared(int acquires) {
//自旋
for (;;) {
//hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,按照FIFO顺序获取锁
//有则返回true表示线程需要排队,没有则返回false则表示线程无需排队。
if (hasQueuedPredecessors()) {//和非公平锁区别在这里
return -1;//需要排队
}
//当前许可数
int available = getState();
//剩余许可数 = 当前许可数-获取数
int remaining = available - acquires;
//剩余许可数小于0(进入同步队列,并阻塞)) 或者 CAS更新当前许可数为剩余许可数成功,则退出返回剩余许可数,否则重试
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
//------------------------内部类>公平AQS同步器end------------------------------
//------------------------内部类>AQS同步器抽象实现start------------------------------
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);//设置同步状态
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
//自旋
for (;;) {
//当前许可数
int available = getState();
//剩余许可数(递减后值) = 当前许可数-获取数
int remaining = available - acquires;
//递减后值小于0(进入同步队列,并阻塞) 或者 大于0,且CAS更新当前许可数为剩余许可数成功,,则退出返回剩余许可数,否则重试
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
final void reducePermits(int reductions) {
//自旋
for (;;) {
//获取当前许可数
int current = getState();
//递减后的值 = 当前许可数-减少许可数
int next = current - reductions;
//递减后的值大于当前许可数,抛出错误,结束程序运行
if (next > current) {
throw new Error("Permit count underflow");
}
//反之,剩余许可数正常,CAS更新剩余许可数为最新许可数,并退出
if (compareAndSetState(current, next)) {
return;
}
}
}
final int drainPermits() {
//自旋
for (;;) {
//获取当前许可数
int current = getState();
//当前许可数为0 或者 不为0时CAS重置许可数为0成功,返回当前许可数(可能current并不为0),则退出,否则重试
if (current == 0 || compareAndSetState(current, 0)) {
return current;
}
}
}
@Override
protected boolean tryReleaseShared(int releases) {
//自旋
for (;;) {
//获取当前许可数
int current = getState();
//最大许可数(累加后值)=当前许可数+释放许可数,超过最大计数
int next = current + releases;
//累加后值 小于 当前许可数+释放许可数 小于当前许可数,抛出错误,结束程序运行
if (next < current) {// overflow
throw new Error("Maximum permit count exceeded");
}
//最大许可数 大于等于当前许可数,CAS更新许可数为最大许可数成功,则退出,否则重试
if (compareAndSetState(current, next)) {
return true;
}
}
}
}
//------------------------内部类>AQS同步器抽象实现end------------------------------
//------------------------对外开放>获取相关状态方法start------------------------------
public int availablePermits() {
return sync.getPermits();
}
public boolean isFair() {
return sync instanceof FairSync;
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection getQueuedThreads() {
return sync.getQueuedThreads();
}
@Override
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
//------------------------对外开放>获取相关状态方法end------------------------------
}
四.Semaphore使用
- 位于【Java多线程】JUC之显示锁(Lock)与初识AQS(队列同步器)



