栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java并发编程Semaphore计数信号量详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java并发编程Semaphore计数信号量详解

Semaphore 是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量)。

简单示例:

package me.socketthread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreLearn { 
  //信号量总数 
  private static final int SEM_MAX = 12; 
  public static void main(String[] args) {  
    Semaphore sem = new Semaphore(SEM_MAX); 
    //创建线程池 
    ExecutorService threadPool = Executors.newFixedThreadPool(3); 
    //在线程池中执行任务 
    threadPool.execute(new MyThread(sem, 7)); 
    threadPool.execute(new MyThread(sem, 4)); 
    threadPool.execute(new MyThread(sem, 2)); 
    //关闭池 
    threadPool.shutdown(); 
  } 
} 
  class MyThread extends Thread { 
    private volatile Semaphore sem;  // 信号量 
    private int count;    // 申请信号量的大小  
     
    MyThread(Semaphore sem, int count) { 
      this.sem = sem; 
      this.count = count; 
    } 
    public void run() { 
      try { 
// 从信号量中获取count个许可 
 sem.acquire(count); 
 Thread.sleep(2000); 
 System.out.println(Thread.currentThread().getName() + " acquire count="+count); 
      } catch (InterruptedException e) { 
 e.printStackTrace(); 
      } finally { 
 // 释放给定数目的许可,将其返回到信号量。 
 sem.release(count); 
 System.out.println(Thread.currentThread().getName() + " release " + count + ""); 
      } 
    } 
  } 

执行结果:

pool-1-thread-2 acquire count=4
pool-1-thread-1 acquire count=7
pool-1-thread-1 release 7
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=2
pool-1-thread-3 release 2

线程1和线程2会并发执行,因为两者的信号量和没有超过总信号量,当前两个线程释放掉信号量之后线程3才能继续执行。

源码分析:

1、构造函数

在构造函数中会初始化信号量值,这值最终是作为锁标志位state的值

Semaphore sem = new Semaphore(12);//简单来说就是给锁标识位state赋值为12 

2、Semaphore.acquire(n);简单理解为获取锁资源,如果获取不到线程阻塞

Semaphore.acquire(n);//从锁标识位state中获取n个信号量,简单来说是state = state-n 此时state大于0表示可以获取信号量,如果小于0则将线程阻塞 
public void acquire(int permits) throws InterruptedException { 
    if (permits < 0) throw new IllegalArgumentException(); 
    //获取锁 
    sync.acquireSharedInterruptibly(permits); 
  } 

acquireSharedInterruptibly中的操作是获取锁资源,如果可以获取则将state= state-permits,否则将线程阻塞

public final void acquireSharedInterruptibly(int arg) 
      throws InterruptedException { 
    if (Thread.interrupted()) 
      throw new InterruptedException(); 
    if (tryAcquireShared(arg) < 0)//tryAcquireShared中尝试获取锁资源 
      doAcquireSharedInterruptibly(arg); //将线程阻塞 
  } 

tryAcquireShared中的操作是尝试获取信号量值,简单来说就是state=state-acquires ,如果此时小于0则返回负值,否则返回大于新值,再判断是否将当线程线程阻塞

protected int tryAcquireShared(int acquires) { 
      for (;;) { 
 if (hasQueuedPredecessors()) 
   return -1; 
      //获取state值 
 int available = getState(); 
      //从state中获取信号量 
 int remaining = available - acquires; 
 if (remaining < 0 || 
   compareAndSetState(available, remaining)) 
 //如果信号量小于0则直接返回,表示无法获取信号量,否则将state值修改为新值 
   return remaining; 
      } 
    } 

doAcquireSharedInterruptibly中的操作简单来说是将当前线程添加到FIFO队列中并将当前线程阻塞。

/会将线程添加到FIFO队列中,并阻塞  
private void doAcquireSharedInterruptibly(int arg)  
    throws InterruptedException {  
    //将线程添加到FIFO队列中  
    final Node node = addWaiter(Node.SHARED);  
    boolean failed = true;  
    try {  
      for (;;) {  
 final Node p = node.predecessor();  
 if (p == head) {  
   int r = tryAcquireShared(arg);  
   if (r >= 0) {  
     setHeadAndPropagate(node, r);  
     p.next = null; // help GC  
     failed = false;  
     return;  
   }  
 }  
 //parkAndCheckInterrupt完成线程的阻塞操作  
 if (shouldParkAfterFailedAcquire(p, node) &&  
   parkAndCheckInterrupt())  
   throw new InterruptedException();  
      }  
    } finally {  
      if (failed)  
 cancelAcquire(node);  
    }  
  } 

3、Semaphore.release(int permits),这个函数的实现操作是将state = state+permits并唤起处于FIFO队列中的阻塞线程。

public void release(int permits) { 
    if (permits < 0) throw new IllegalArgumentException(); 
  //state = state+permits,并将FIFO队列中的阻塞线程唤起 
    sync.releaseShared(permits); 
  } 

releaseShared中的操作是将state = state+permits,并将FIFO队列中的阻塞线程唤起。

public final boolean releaseShared(int arg) { 
    //tryReleaseShared将state设置为state = state+arg 
    if (tryReleaseShared(arg)) { 
      //唤起FIFO队列中的阻塞线程 
      doReleaseShared(); 
      return true; 
    } 
    return false; 
  } 

tryReleaseShared将state设置为state = state+arg

protected final boolean tryReleaseShared(int releases) { 
      for (;;) { 
 int current = getState(); 
 int next = current + releases; 
 if (next < current) // overflow 
   throw new Error("Maximum permit count exceeded"); 
 //将state值设置为state=state+releases 
 if (compareAndSetState(current, next)) 
   return true; 
      } 
    } 

doReleaseShared()唤起FIFO队列中的阻塞线程

private void doReleaseShared() {  
  
    for (;;) {  
      Node h = head;  
      if (h != null && h != tail) {  
 int ws = h.waitStatus;  
 if (ws == Node.SIGNAL) {  
   if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  
     continue;      // loop to recheck cases  
   //完成阻塞线程的唤起操作  
   unparkSuccessor(h);  
 }  
 else if (ws == 0 &&  
      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
   continue; // loop on failed CAS  
      }  
      if (h == head)   // loop if head changed  
 break;  
    }  
  }  

总结:Semaphore简单来说设置了一个信号量池state,当线程执行时会从state中获取值,如果可以获取则线程执行,并且在执行后将获取的资源返回到信号量池中,并唤起其他阻塞线程;如果信号量池中的资源无法满足某个线程的需求则将此线程阻塞。

Semaphore源码:

public class Semaphore implements java.io.Serializable { 
  private static final long serialVersionUID = -3222578661600680210L; 
  private final Sync sync; 
  abstract static class Sync extends AbstractQueuedSynchronizer { 
    private static final long serialVersionUID = 1192457210091910933L; 
    //设置锁标识位state的初始值 
    Sync(int permits) { 
      setState(permits); 
    } 
    //获取锁标识位state的值,如果state值大于其需要的值则表示锁可以获取 
    final int getPermits() { 
      return getState(); 
    } 
    //获取state值减去acquires后的值,如果大于等于0则表示锁可以获取 
    final int nonfairTryAcquireShared(int acquires) { 
      for (;;) { 
 int available = getState(); 
 int remaining = available - acquires; 
 if (remaining < 0 || 
   compareAndSetState(available, remaining)) 
   return remaining; 
      } 
    } 
    //释放锁 
    protected final boolean tryReleaseShared(int releases) { 
      for (;;) { 
 int current = getState(); 
 //将state值加上release值 
 int next = current + releases; 
 if (next < current) // overflow 
   throw new Error("Maximum permit count exceeded"); 
 if (compareAndSetState(current, next)) 
   return true; 
      } 
    } 
    //将state的值减去reductions 
    final void reducePermits(int reductions) { 
      for (;;) { 
 int current = getState(); 
 int next = current - reductions; 
 if (next > current) // underflow 
   throw new Error("Permit count underflow"); 
 if (compareAndSetState(current, next)) 
   return; 
      } 
    } 
    final int drainPermits() { 
      for (;;) { 
 int current = getState(); 
 if (current == 0 || compareAndSetState(current, 0)) 
   return current; 
      } 
    } 
  } 
  //非公平锁 
  static final class NonfairSync extends Sync { 
    private static final long serialVersionUID = -2694183684443567898L; 
    NonfairSync(int permits) { 
      super(permits); 
    } 
    protected int tryAcquireShared(int acquires) { 
      return nonfairTryAcquireShared(acquires); 
    } 
  } 
  //公平锁 
  static final class FairSync extends Sync { 
    private static final long serialVersionUID = 2014338818796000944L; 
    FairSync(int permits) { 
      super(permits); 
    } 
    protected int tryAcquireShared(int acquires) { 
      for (;;) { 
 if (hasQueuedPredecessors()) 
   return -1; 
 int available = getState(); 
 int remaining = available - acquires; 
 if (remaining < 0 || 
   compareAndSetState(available, remaining)) 
   return remaining; 
      } 
    } 
  } 
  //设置信号量 
  public Semaphore(int permits) { 
    sync = new NonfairSync(permits); 
  } 
  public Semaphore(int permits, boolean fair) { 
    sync = fair ? new FairSync(permits) : new NonfairSync(permits); 
  } 
  //获取锁 
  public void acquire() throws InterruptedException { 
    sync.acquireSharedInterruptibly(1); 
  } 
  public void acquireUninterruptibly() { 
    sync.acquireShared(1); 
  } 
  public boolean tryAcquire() { 
    return sync.nonfairTryAcquireShared(1) >= 0; 
  } 
  public boolean tryAcquire(long timeout, TimeUnit unit) 
    throws InterruptedException { 
    return sync.tryAcquireSharedNanos(1, unit.tonanos(timeout)); 
  } 
  public void release() { 
    sync.releaseShared(1); 
  } 
  //获取permits值锁 
  public void acquire(int permits) throws InterruptedException { 
    if (permits < 0) throw new IllegalArgumentException(); 
    sync.acquireSharedInterruptibly(permits); 
  } 
  public void acquireUninterruptibly(int permits) { 
    if (permits < 0) throw new IllegalArgumentException(); 
    sync.acquireShared(permits); 
  } 
  public boolean tryAcquire(int permits) { 
    if (permits < 0) throw new IllegalArgumentException(); 
    return sync.nonfairTryAcquireShared(permits) >= 0; 
  } 
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) 
    throws InterruptedException { 
    if (permits < 0) throw new IllegalArgumentException(); 
    return sync.tryAcquireSharedNanos(permits, unit.tonanos(timeout)); 
  } 
  //释放 
  public void release(int permits) { 
    if (permits < 0) throw new IllegalArgumentException(); 
    sync.releaseShared(permits); 
  } 
  public int availablePermits() { 
    return sync.getPermits(); 
  } 
  public int drainPermits() { 
    return sync.drainPermits(); 
  } 
  protected void reducePermits(int reduction) { 
    if (reduction < 0) throw new IllegalArgumentException(); 
    sync.reducePermits(reduction); 
  } 
  public boolean isFair() { 
    return sync instanceof FairSync; 
  } 
  public final boolean hasQueuedThreads() { 
    return sync.hasQueuedThreads(); 
  } 
  public final int getQueueLength() { 
    return sync.getQueueLength(); 
  } 
  protected Collection getQueuedThreads() { 
    return sync.getQueuedThreads(); 
  } 
  public String toString() { 
    return super.toString() + "[Permits = " + sync.getPermits() + "]"; 
  } 
} 

总结

以上就是本文关于Java并发编程Semaphore计数信号量详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Java并发编程之重入锁与读写锁、Java系统的高并发解决方法详解、java高并发锁的3种实现示例代码等,有什么问题,可以留言交流讨论。感谢朋友们对本站的支持!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/143534.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号