栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

juc-09-控制并发流程工具类

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

juc-09-控制并发流程工具类

这篇文章,介绍一些比较常用的控制并发流程的工具类:CountDownLatch、CyclicBarrier、Semaphore、Exchange、Condition。
通过这些工具类,我们可以更好地控制并发流程,多线程之间更好地协同工作。

常用的控制并发流程的工具类简介:

描述
CountDownLatch
倒数门闩
是一个或一组线程等待其他的线程完成工作以后再执行。
CyclicBarrier
循环栅栏
让一组线程达到某个屏障,被阻塞,等到指定数量的线程都达到屏障时,屏障开放,所有被阻塞的线程会继续运行
Semaphore
信号量
用来限制或管理数量有限的资源的使用,例如流量控制
Exchange
数据交换器
两个线程间的数据交换
Condition
条件对象
可以控制线程的“等待”和“唤醒”,实现线程间通信,使用上类似 Object 中的 wait() 和 notify()
2、 CountDownLatch 倒计时门闩

>CountDownLatch 是一组线程等待其他的线程完成工作以后再执行。

CountDownLatch常用API:

方法 描述
CountDownLatch(int count) 创建 CountDownLatch,并且指定数量count,count不能小于0
void await() 当前线程阻塞等待直到闩锁的计数倒数到0。如果线程被中断 interrupted,则抛出InterruptedException
boolean await
(long timeout, TimeUnit unit)
指定等待时间,阻塞当前线程,直到闩锁的计数倒数到0 或者 超时。如果线程被中断 interrupted,则抛出InterruptedException
void countDown() 闩锁的计数减 1 ,如果计数为零,则释放所有等待的线程。
long getCount() 闩锁的当前计数

注意:

  • 可以多个地方同时调用 await()
  • 可以在同个线程中多次 countDown()
2.1 CountDownLatch 用法一(一等多):一个线程等待多个线程都执行完毕,再继续自己的工作


public class CountDownLatchDemo1 {

    public static void main(String[] args) throws InterruptedException {
 CountDownLatch latch = new CountDownLatch(5);
 ExecutorService service = Executors.newFixedThreadPool(5);
 for (int i = 0; i < 5; i++) {
     final int no = i + 1;
     Runnable runnable = new Runnable() {

  @Override
  public void run() {
      try {
   Thread.sleep((long) (Math.random() * 10000));
   System.out.println("运动员 No." + no + "准备好了。");
      } catch (InterruptedException e) {
   e.printStackTrace();
      } finally {
   latch.countDown();
      }
  }
     };
     service.submit(runnable);
 }
 System.out.println("等待所有运动员热身准备.....");
 latch.await();
 System.out.println("所有人准备完毕,鸣发令枪,开跑...");
    }
}


运行结果:

等待所有运动员热身准备.....
运动员 No.3准备好了。
运动员 No.4准备好了。
运动员 No.1准备好了。
运动员 No.5准备好了。
运动员 No.2准备好了。
所有人准备完毕,鸣发令枪,开跑...
2.2 CountDownLatch 用法二(多等一):多个线程等待某一个线程的信号,同时开始执行


public class CountDownLatchDemo2 {

    public static void main(String[] args) throws InterruptedException {
 CountDownLatch begin = new CountDownLatch(1);
 ExecutorService service = Executors.newFixedThreadPool(5);
 for (int i = 0; i < 5; i++) {
     final int no = i + 1;
     Runnable runnable = new Runnable() {
  @Override
  public void run() {
      System.out.println("No." + no + "准备完毕,等待发令枪");
      try {
   begin.await();
   System.out.println("No." + no + "开始跑步了");
      } catch (InterruptedException e) {
   e.printStackTrace();
      }
  }
     };
     service.submit(runnable);
 }
 //裁判员检查发令枪...
 Thread.sleep(5000);
 System.out.println("发令枪响,比赛开始!");
 begin.countDown();
    }
}

运行结果:

No.1准备完毕,等待发令枪
No.3准备完毕,等待发令枪
No.4准备完毕,等待发令枪
No.2准备完毕,等待发令枪
No.5准备完毕,等待发令枪
发令枪响,比赛开始!
No.1开始跑步了
No.3开始跑步了
No.5开始跑步了
No.2开始跑步了
No.4开始跑步了
2.3 CountDownLatch 用法三(多等多):多个线程await等待,多个线程的countDown减一
public class CountDownLatchDemo3 {

    static CountDownLatch latch = new CountDownLatch(3);

    public static void main(String[] args) throws InterruptedException {
 // 启动业务线程,需要等待 CountDownLatch 计数为0 才能执行它的业务
 new Thread(new BusiThread(),"Thread1").start();

 // 模拟一个线程中可以多次 countDown()
 // 这个线程有 2 步操作,假设每步操作完成后都需要扣减 1 次
 new Thread(() -> {
     try {
  TimeUnit.MILLISECONDS.sleep(5);
  System.out.println(Thread.currentThread().getName() + " ready init work step 1st......");
  //第一步工作完成,扣减一次
  latch.countDown();

  TimeUnit.MILLISECONDS.sleep(100);
  System.out.println("begin step 2nd.......");
  System.out.println(Thread.currentThread().getName() + " ready init work step 2nd......");
  //第二步工作完成,再扣减一次
  latch.countDown();
     } catch (InterruptedException e) {
  e.printStackTrace();
     }
 },"Thread1").start();

 // 另外一条子线程,执行一次 countDown()
 new Thread(() -> {
     try {
  TimeUnit.MILLISECONDS.sleep(10);
  System.out.println(Thread.currentThread().getName() + " countDown......");
  latch.countDown();
     } catch (InterruptedException e) {
  e.printStackTrace();
     }
 },"Thread2").start();


 // Main线程指定超时时间阻塞等待
 System.out.println("Main await--");
 latch.await(10,TimeUnit.MILLISECONDS);
 // Main线程等待超时,放弃继续等待,继续执行业务
 System.out.println("time out,Main do its work........");
    }

    //业务线程
    private static class BusiThread implements Runnable {

 @Override
 public void run() {
     try {
  System.out.println("BusiThread await--");
  latch.await();
  System.out.println("BusiThread do business-----");
     } catch (InterruptedException e) {
  e.printStackTrace();
     }
 }

    }
}

运行结果:

BusiThread await--
Main await--
Thread1 ready init work step 1st......
Thread2 countDown......
time out,Main do its work........
begin step 2nd.......
Thread1 ready init work step 2nd......
BusiThread do business-----
3、CyclicBarrier 循环栅栏

>CyclicBarrier 循环栅栏和 CountDownLatch 很类似,都能阻塞一组线程
>让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行

当有大量线程相互配合,分别计算不同任务,并且需要最后统一汇总的时候,我们可以使用 CyclicBarrier 。CyclicBarrier可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务。

举例:全班同学明天中午在校门口集合,都到齐后,一起去郊游。

CyclicBarrier 常用API:

方法 描述
CyclicBarrier(int parties) 创建一个新的CyclicBarrier,当给定数量的参与方(线程)正在等待它时,它将跳闸(打开屏障),并且在屏障被打开时不执行预定义的操作,所有被阻塞的线程会继续运行
CyclicBarrier
(int parties, Runnable barrierAction)
创建一个新的CyclicBarrier,当给定数量的参与方(线程)正在等待它时,它将跳闸(打开屏障),并且在屏障被打开时执行给定的屏障操作 barrierAction,由最后一个进入屏障的线程执行,barrierAction 线程执行完成后,其他使用 CyclicBarrier.await() 的线程,才继续执行。
void await() 当前线程阻塞等待直到指定数量的线程调用了await()
int await
(long timeout, TimeUnit unit)
指定等待时间,阻塞当前线程,直到指定数量的线程调用了await() 或者 超时

CyclicBarrier Demo

public class CyclicBarrierDemo {
    public static void main(String[] args) {
 CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
     @Override
     public void run() {
     // 在屏障被打开时执行给定的屏障操作, 由最后一个进入屏障的线程执行
  System.out.println("线程" + Thread.currentThread().getName() + ":所有人都到场了, 大家统一出发!");
     }
 });
 for (int i = 0; i < 10; i++) {
     new Thread(new Task(i, cyclicBarrier),"Thread"+i).start();
 }
    }

    static class Task implements Runnable {
 private int id;
 private CyclicBarrier cyclicBarrier;

 public Task(int id, CyclicBarrier cyclicBarrier) {
     this.id = id;
     this.cyclicBarrier = cyclicBarrier;
 }

 @Override
 public void run() {
     System.out.println("线程" + Thread.currentThread().getName() + ":现在前往集合地点");
     try {
  Thread.sleep((long) (Math.random() * 10000));
  System.out.println("线程" + Thread.currentThread().getName() + ":到了集合地点,开始等待其他人到达");
  cyclicBarrier.await();
  System.out.println("线程" + Thread.currentThread().getName() + ":出发了");
     } catch (InterruptedException e) {
  e.printStackTrace();
     } catch (BrokenBarrierException e) {
  e.printStackTrace();
     }
 }
    }
}

运行结果:

线程Thread0:现在前往集合地点
线程Thread4:现在前往集合地点
线程Thread2:现在前往集合地点
线程Thread1:现在前往集合地点
线程Thread6:现在前往集合地点
线程Thread3:现在前往集合地点
线程Thread5:现在前往集合地点
线程Thread8:现在前往集合地点
线程Thread7:现在前往集合地点
线程Thread9:现在前往集合地点
线程Thread7:到了集合地点,开始等待其他人到达
线程Thread4:到了集合地点,开始等待其他人到达
线程Thread1:到了集合地点,开始等待其他人到达
线程Thread8:到了集合地点,开始等待其他人到达
线程Thread9:到了集合地点,开始等待其他人到达
线程Thread9:所有人都到场了, 大家统一出发!
线程Thread9:出发了
线程Thread7:出发了
线程Thread4:出发了
线程Thread1:出发了
线程Thread8:出发了
线程Thread0:到了集合地点,开始等待其他人到达
线程Thread5:到了集合地点,开始等待其他人到达
线程Thread2:到了集合地点,开始等待其他人到达
线程Thread3:到了集合地点,开始等待其他人到达
线程Thread6:到了集合地点,开始等待其他人到达
线程Thread6:所有人都到场了, 大家统一出发!
线程Thread6:出发了
线程Thread0:出发了
线程Thread5:出发了
线程Thread2:出发了
线程Thread3:出发了
CountDownLatch和CyclicBarrier辨析
  • 作用不同: CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待数字到 0,也就是说, CountDownLatch 用于事件,但是 CyclicBarrier 是用于线程的。

  • 可重用性不同:CountDownLatch 在倒数到 0 并触发门闩打开后,就不能再次使用了,除非新建新的实例;而 CyclicBarrier 可以重复使用。

  • CountDownLatch 是否往下执行,由其它线程觉得,CyclicBarrier放行由一组线程本身控制

4、Semaphore 信号量

用来限制或管理数量有限的资源的使用,例如流量控制

>信号量的作用是维护一个“许可证”的计数,线程可以“获取”许可证,那信号量剩余的许可就减一,线程也可以“释放”一个许可证,那信号量剩余的许可证就加一,当信号量所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证。

Semaphore 常用API:

方法 描述
Semaphore(int permits) 指定许可证的数量,创建一个非公平的Semaphore
Semaphore
(int permits, boolean fair)
指定许可证和设置是否使用公平策略,
true表示公平: Semaphore 把等待的线程放到 FIFO 的队列中,当有了新的许可证,可以发给等待最长时间的线程。
void acquire() 从Semaphore获取一个许可证,如果没有则阻塞直到有一个可用
void acquire
(int permits)
从Semaphore获取指定数量的许可证,如果没有则阻塞直到指定数量的许可证可用
void acquireUninterruptibly() 从Semaphore获取一个许可证,如果没有则阻塞直到有一个可用,就是当前线程被中断(interrupted)也不抛出InterruptedException,继续阻塞等待
void release() 释放一个许可证
void release
(int permits)
释放指定数量的许可证
boolean tryAcquire() 如果当前Semaphore有可用的则获取一个许可证,不阻塞当前线程
true: 获取成功
false: 获取失败
boolean tryAcquire
(int permits)
尝试获取指定数量的许可证,不阻塞当前线程
true: 获取成功
false: 获取失败
boolean tryAcquire(long timeout, TimeUnit unit) 指定等待时间,尝试获取一个许可证,如果超时,则放弃等待
true: 获取成功
false: 获取失败
boolean tryAcquire(int permits, long timeout, TimeUnit unit) 指定等待时间,尝试获取指定数量的许可证,如果超时,则放弃等待
true: 获取成功
false: 获取失败
4.1 使用流程
  1. 初始化 Semaphore并指定许可证的数量
  2. 在需要被限制的代码前加 acquire() 或者 acquireUninterruptibly() 方法
  3. 在任务执行结束后,调用 release() 来释放许可证。

Semaphore Demo

public class SemaphoreDemo {

    static Semaphore semaphore = new Semaphore(3, true);

    public static void main(String[] args) {
 ExecutorService service = Executors.newFixedThreadPool(5);
 for (int i = 0; i < 5; i++) {
     service.submit(new Task());
 }
 service.shutdown();
    }

    static class Task implements Runnable {

 @Override
 public void run() {
     try {
  System.out.println(Thread.currentThread().getName() + "准备获取许可证");
  semaphore.acquire(1);
     } catch (InterruptedException e) {
  e.printStackTrace();
     }
     System.out.println(Thread.currentThread().getName() + "拿到了许可证------剩余许可证:"+semaphore.availablePermits());
     try {
  Thread.sleep(2000);
     } catch (InterruptedException e) {
  e.printStackTrace();
     }
     semaphore.release(1);
     System.out.println(Thread.currentThread().getName() + "释放了许可证+++++++剩余许可证:"+semaphore.availablePermits());
 }
    }
}

运行结果:

pool-1-thread-1准备获取许可证
pool-1-thread-3准备获取许可证
pool-1-thread-3拿到了许可证------剩余许可证:1
pool-1-thread-2准备获取许可证
pool-1-thread-2拿到了许可证------剩余许可证:0
pool-1-thread-4准备获取许可证
pool-1-thread-1拿到了许可证------剩余许可证:2
pool-1-thread-5准备获取许可证
pool-1-thread-1释放了许可证+++++++剩余许可证:3
pool-1-thread-4拿到了许可证------剩余许可证:2
pool-1-thread-3释放了许可证+++++++剩余许可证:3
pool-1-thread-2释放了许可证+++++++剩余许可证:3
pool-1-thread-5拿到了许可证------剩余许可证:1
pool-1-thread-4释放了许可证+++++++剩余许可证:2
pool-1-thread-5释放了许可证+++++++剩余许可证:3
4.2 注意点
  1. 一般场景下,获取和释放的许可证数量要一致。
  2. 注意在初始化 semaphore 的时候设置公平性,一般设置为 true 会更合理
  3. 并不是必须由许可证的线程释放那个许可证,事实上,获取和释放许可证的线程并无要求,也许是 A 获取了,然后由 B 释放了,只要逻辑合理即可。
  4. 信号量的作用,除了控制临界区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,例如线程1需要在线程2完成准备工作后才能开始工作,那么线程1 acquire() ,而线程2完成任务后 release() ,这样的话,相当于是轻量级的 CountDownLatch.
5、Exchange

>两个线程间的数据交换

这个工具类在项目中用的比较少,简单介绍下用法。

public class ExchangeDemo {
    private static final Exchanger> exchange = new Exchanger>();

    public static void main(String[] args) {

 //第一个线程
 new Thread(new Runnable() {
     @Override
     public void run() {
  Set setA = new HashSet();//存放数据的容器
  try {
      // 添加数据
      setA.add("A1");
      setA.add("A2");
      // 先调用 exchange() 的线程会阻塞,直到后面的线程调用 exchange()执行时,才进行数据交换,继续运行
      setA = exchange.exchange(setA);//交换set
      // 处理交换后的数据
      System.out.println(Thread.currentThread().getName() + "----" + setA);
  } catch (InterruptedException e) {
  }
     }
 }, "ThreadA").start();

 //第二个线程
 new Thread(new Runnable() {
     @Override
     public void run() {
  Set setB = new HashSet();//存放数据的容器
  try {
      // 添加数据
      setB.add("B1");
      setB.add("B2");
      setB.add("B3");
      // 先调用 exchange() 的线程会阻塞,直到后面的线程调用 exchange()执行时,才进行数据交换,继续运行
      setB = exchange.exchange(setB);//交换set
      // 处理交换后的数据
      System.out.println(Thread.currentThread().getName() + "----" + setB);

  } catch (InterruptedException e) {
  }
     }
 }, "ThreadB").start();

    }
}

运行结果:

ThreadA----[B2, B3, B1]
ThreadB----[A1, A2]
6、Condition 接口
方法 描述
void await() 当前线程阻塞等待直到被唤醒(signalled) 。如果线程被中断 interrupted,则抛出InterruptedException
void awaitUninterruptibly() 当前线程阻塞等待直到被唤醒(signalled)
void signal() 唤醒等待时间最长的线程,在从 await 返回之前,该线程必须重新获取锁
void signalAll() 唤醒所有正在等待的线程,每个线程必须重新获取锁,然后才能从 await 返回。

signalAll() 和 signal() 区别

  • signalAll() 会唤起所有的正在等待的线程
  • 但是 signal() 是公平的,只会唤起那个等待时间最长的线程。

Condition与 Object 的 wait, notify, notifyAll 对比:

  1. Condition 的 await,signal, singalAll 与 Object 的 wait, notify, notifyAll 都可以实现线程之间的通信,两者在使用上也是非常类似,都需要先获取锁之后才能调用,而不同的是 Object wait,notify 对应的是 synchronized 方式的锁,Condition await,singal 则对应的是 ReentrantLock (实现 Lock 接口的锁对象)对应的锁
  2. Condition是通过 Lock接口的 lock.newCondition() 新建一个Condition实例,而且 Lock实例可以新建多个Condition实例

注意: 在执行 Condition 的 await 和 signal 等方法之前,得要先持有condition相关联的 Lock。

简述Conditiond使用步骤:

  1. 线程1 持有condition相关联的lock,当要等待某个条件时候,它就去执行 condition.await() 方法,一旦执行了 await() 方法,线程1就会释放当前线程所占用的lock,进入阻塞状态。
  2. 通常会有另外一个线程,假设是线程2,抢到condition相关联的lock,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行 condition.signal() 方法,这时JVM就会从被阻塞的线程里找,找到那些等待该 condition 的线程,线程1 如果能够从await()方法返回的话一定是该线程获取了与condition相关联的lock,它的线程状态就会变成 Runnable 可执行状态。
6.1 Condition 演示生产者消费者


public class ConditionDemo {
    // 可生产的最大数量
    private final int maxCount = 5;
    // 当前已有数量
    private int curCount;
    // lock
    private Lock lock = new ReentrantLock();
    // lock新建一个 notFull 的 Condition,
    // 如果notFull.await()  表示已达到最大生产数量,生产者阻塞等待
    private Condition notFull = lock.newCondition();

    // lock新建一个 notEmpty 的 Condition,
    // 如果notEmpty.await()  表示当前数量为空,消费者阻塞等待
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
 ConditionDemo demo = new ConditionDemo();
 for (int i = 0; i < 3; i++) {
     Producer producer = new Producer(demo);
     new Thread(producer, "生产者P" + (i+1)).start();
 }
 for (int i = 0; i < 2; i++) {
     Consumer consumer = new Consumer(demo);
     new Thread(consumer, "消费者C" + (i+1)).start();
 }
    }

    
    public void produce() {
 //获取锁
 lock.lock();
 try {
     // 抢到锁
     while (curCount >= maxCount) {
  System.out.println(Thread.currentThread().getName() + "生产已满,等待消费--");
  // notFull.await()  表示已达到最大生产数量,生产者阻塞等待
  this.notFull.await();
     }
     this.curCount++;
     System.out.println(Thread.currentThread().getName() + "生产,当前数量--" + curCount);
     this.notEmpty.signal();
 } catch (InterruptedException e) {
     e.printStackTrace();
 } finally {
     //释放锁
     lock.unlock();
 }
    }

    
    public void consume() {
 //获取锁
 lock.lock();
 try {
     // 抢到锁
     while (curCount <= 0) {
  System.out.println(Thread.currentThread().getName() + "无货消费,等待生产====");
  // notEmpty.await()  表示当前数量为空,消费者阻塞等待
  this.notEmpty.await();
     }
     this.curCount--;
     System.out.println(Thread.currentThread().getName() + "消费,当前剩余数量--" + curCount);
     this.notFull.signal();
 } catch (InterruptedException e) {
     e.printStackTrace();
 } finally {
     //释放锁
     lock.unlock();
 }
    }

}

//生产者
class Producer implements Runnable {
    private ConditionDemo demo;

    public Producer(ConditionDemo demo) {
 this.demo = demo;
    }

    @Override
    public void run() {
 while (true) {

     try {
  // 生产者生产
  this.demo.produce();
  TimeUnit.MILLISECONDS.sleep((long) (Math.random()*1500));//模拟生产耗时
     } catch (InterruptedException e) {
  e.printStackTrace();
     }
 }
    }
}

//消费者
class Consumer implements Runnable {
    private ConditionDemo demo;

    public Consumer(ConditionDemo demo) {
 this.demo = demo;
    }

    @Override
    public void run() {
 while (true) {

     try {
  // 消费者消费
  this.demo.consume();
  TimeUnit.MILLISECONDS.sleep((long) (Math.random()*1000));//模拟消费耗时
     } catch (InterruptedException e) {
  e.printStackTrace();
     }
 }
    }
}

运行结果:

生产者P1生产,当前数量--1
生产者P2生产,当前数量--2
生产者P3生产,当前数量--3
消费者C2消费,当前剩余数量--2
消费者C1消费,当前剩余数量--1
消费者C2消费,当前剩余数量--0
生产者P2生产,当前数量--1
消费者C1消费,当前剩余数量--0
消费者C2无货消费,等待生产====
生产者P3生产,当前数量--1
消费者C2消费,当前剩余数量--0
生产者P1生产,当前数量--1
生产者P3生产,当前数量--2
生产者P2生产,当前数量--3
消费者C1消费,当前剩余数量--2
...

小结:
上面篇幅介绍了CountDownLatch、CyclicBarrier、Semaphore 和 Exchange 的API,也分别通过 demo 讲解了这几个工具类的使用,然后介绍了 Condition 与 Object 的 wait, notify, notifyAll 的对比,Condition的API,通过demo演示如何使用 Condition 实现线程间的通信(生产者消费者案例)。

>代码:
>github.com/wengxingxia/002juc.git

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号