上次看完Sempahore在实际工作中的使用后,由于上次时间比较赶,只是写了一个小Demo,来演示Sempahore的使用,今天利用午休时间来看看Semaphore的内部结构以及相应的方法。
二.Semaphore的内部 1.Semaphore的继承关系Semaphore实现了Seializable接口,可以完成序列化。
public class Semaphore implements java.io.Serializable2.Semaphore的成员变量
private static final long serialVersionUID = -3222578661600680210L; //通过继承AbstractQueuedSynchronizer的内部类实例 private final Sync sync;3.Semaphore三个静态内部类 3.1 Sync 抽象静态内部类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//初始化许可
Sync(int permits) {
//设置AQS中的state变量值为permits对应的个数private volatile int state;
setState(permits);
}
//获得许可
final int getPermits() {
//获取state的值
return getState();
}
//非公平的尝试获取共享的剩余许可个数
final int nonfairTryAcquireShared(int acquires) {
//无限循环
for (;;) {
//获取可用许可个数
int available = getState();
//计算剩余的许可个数,用于判断许可是否够用,以便后面进行操作
int remaining = available - acquires;
//如果剩余的个数小于0或者通过CAS设置成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//共享模式下释放许可
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取可用许可个数
int current = getState();
//加上释放的许可后的总数
int next = current + releases;
//溢出抛出异常
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS操作判断是否设置成功
if (compareAndSetState(current, next))
return true;
}
}
//指定减少多少个许可
final void reducePermits(int reductions) {
for (;;) {
//获取可用许可个数
int current = getState();
//减去减少的许可后剩余的许可数
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//CAS操作
if (compareAndSetState(current, next))
return;
}
}
//返回当前所有可用的许可,并设置可用许可个数为0
final int drainPermits() {
for (;;) {
//获取可用许可个数
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
3.2 非公平模式 NofairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
//初始化可用许可个数
NonfairSync(int permits) {
super(permits);
}
//共享模式下获取剩余许可个数,调用父类Sync的方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
3.3公平模式 FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
//构造方法,初始化可用许可格式
FairSync(int permits) {
super(permits);
}
//尝试共享模式下获取
protected int tryAcquireShared(int acquires) {
for (;;) {
//判断前方是否有等待的节点
if (hasQueuedPredecessors())
return -1;
//获取可用许可个数
int available = getState();
//计算剩余的许可个数,用于判断许可是否够用,以便后面进行操作
int remaining = available - acquires;
//如果剩余的个数小于0或者通过CAS设置成功返回剩余许可
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public final boolean hasQueuedPredecessors() {
//等待队列中的尾节点
Node t = tail;
//头部节点,注意这里的head不是等待队列中的节点
Node h = head;
Node s;
//头节点不和尾节点相同,并且当前线程不等于等待队列中节点的线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
4.Semaphore的构造方法
//指定许可个数,默认非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//指定许可个数,并指定许可个数
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
5.acquire方法调用链路
//获取许可
public void acquire() throws InterruptedException {
//调用共享模式下获取许可,默认为1
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判断是否被中断,被中断抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取许可,若许可数量小于0,则进入等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//封装成一个等待节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取当前节点的前驱
final Node p = node.predecessor();
//若当前等待节点的前驱等于head
if (p == head) {
//尝试共享模式下获取
int r = tryAcquireShared(arg);
//r大于0说明许可足够
if (r >= 0) {
//设置head节点并且广播
setHeadAndPropagate(node, r);
//将前置节点的后驱置为null
p.next = null; // help GC
failed = false;
return;
}
}
//走到这里说明没有获取到许可,没有抢占到锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //挂起当前线程
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 为当前线程和给定模式创建和排队节点。
private Node addWaiter(Node mode) {
//创建节点,将当前线程封装进去
Node node = new Node(Thread.currentThread(), mode);
//获取尾部节点
Node pred = tail;
//如果尾部节点不为空
if (pred != null) {
//将当前节点的前驱指向尾部节点
node.prev = pred;
//CAS操作,因为此时此刻可能有多个线程在做同样的操作,需要判断尾部节点是否被改变
if (compareAndSetTail(pred, node)) {
//将尾部节点的后驱指向当前节点
pred.next = node;
//返回当前节点
return node;
}
}
//设置失败后的操作,即通过自旋完成入队列操作
enq(node);
return node;
}
private Node enq(final Node node) {
//无限for循环当设置成功后结束循环
for (;;) {
//获取尾部节点
Node t = tail;
//判断是否为空,若为空则进行初始化操作
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//将当前节点的前驱设置为尾部节点
node.prev = t;
//CAS操作
if (compareAndSetTail(t, node)) {
//尾部节点的后驱设置为当前节点
t.next = node;
return t;
}
}
}
}
private void setHeadAndPropagate(Node node, int propagate) {
//将head节点赋值给h
Node h = head; // Record old head for check below
//设置当前节点为head节点
setHead(node);
//判断head节点是否为空,或者等待状态小于0
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//若当前节点的后驱为空或者该节点为共享模式
if (s == null || s.isShared())
//共享模式释放操作
doReleaseShared();
}
}
private void doReleaseShared() {
//无线循环
for (;;) {
//获取当前head节点
Node h = head;
//若h不为空,且不等于尾节点
if (h != null && h != tail) {
//获取该节点的等待状态
int ws = h.waitStatus;
//若为需要唤醒
if (ws == Node.SIGNAL) {
//如果失败则继续CAS
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
//获取当前节点的状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的下一个节点
Node s = node.next;
//该节点不为空或者状态大于0
if (s == null || s.waitStatus > 0) {
s = null;
//从尾部开始往前找,一直找到状态小于0的
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//若该节点不为空
if (s != null)
//唤醒该节点的线程
LockSupport.unpark(s.thread);
}
//尝试获取失败是否应该挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱节点的状态
int ws = pred.waitStatus;
//若为singl,则代码需要唤醒后继节点正常返回true
if (ws == Node.SIGNAL)
return true;
//若大于0代表取消了当前节点排队
if (ws > 0) {
do {
//一直往前面找节点直到找到状态小于0,即能够唤醒当前节点的为止
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//状态必须为0或者是propagate,设置状态为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
6.release调用链路
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//尝试释放资源
if (tryReleaseShared(arg)) {
//释放资源
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
//自旋
for (;;) {
//获取当前许可数量
int current = getState();
//计算释放许可后剩余许可的总数
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS设置许可数
if (compareAndSetState(current, next))
return true;
}
}
三.总结
其实Semaphore就是利用了一个state来代表所有的资源,每个线程进来尝试获取自己本身需要的资源,若资源不够就进行等待,每个线程抢占完资源后,又释放资源,然后通知到后继节点,进行操作,其实个人感觉好像和生产者和消费者的模式相类似,只不过这里把每个线程封装成一个Node放入一个虚拟的双向队列CLH进行操作,然后通过控制共享变量的值操作这个队列中的一些线程进行相应的操作,就相当于我们操作一个队列中的数据一样,当队列中的数据有了就通知线程来进行消费,当没有了就通知其他线程生产消息。个人理解,勿喷!打球去咯!



