栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JUC核心控件AQS源码解析第四部分(CyclicBarrier源码解析)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JUC核心控件AQS源码解析第四部分(CyclicBarrier源码解析)

CyclicBarrier作用和CountDownLatch类似,CyclicBarrier可以重复使用,并且可以指定所有线程到达栅栏后执行什么任务
不过CyclicBarrier和CountDownLatch的原理不同,CyclicBarrier是基于Condition实现的,而Condition是基于AQS的独占锁实现的,CountDownLatch是基于AQS的共享锁实现的

1、使用方法
public void test() {
        ExecutorService e = Executors.newFixedThreadPool(8);
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            e.shutdown();
           System.out.println("所有玩家加载完毕关闭线程池");
        });
        ExecutorService e = Executors.newFixedThreadPool(8);
        for (int i = 1; i <= 5; ++i) // create and start threads
             e.execute(new WorkerRun(barrier, i));



    }
    class WorkerRun implements Runnable {
        private final CyclicBarrier barrier;
        private final int i;

        WorkerRun(CyclicBarrier barrier, int i) {
            this.barrier = barrier;
            this.i = i;
        }

        public void run() {
            doWork(i);
            // 这个线程的任务完成了,调用 countDown 方法
            try {
                barrier.await();
                System.out.println("所有玩家加载完毕");
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            } catch (BrokenBarrierException brokenBarrierException) {
                brokenBarrierException.printStackTrace();
            } 
        }

        void doWork(int i) {
            System.out.println("玩家" + i + "加载完了,等待其他玩家加载");
        }
    }

输出结果

玩家1加载完了,等待其他玩家加载
玩家5加载完了,等待其他玩家加载
玩家4加载完了,等待其他玩家加载
玩家3加载完了,等待其他玩家加载
玩家2加载完了,等待其他玩家加载
所有玩家加载完毕关闭线程池
玩家2知道所有玩家已加载完毕
玩家1知道所有玩家已加载完毕
玩家3知道所有玩家已加载完毕
玩家4知道所有玩家已加载完毕
玩家5知道所有玩家已加载完毕

从输出结果可以看出要所有玩家都加载完毕了才会从各种的await()方法返回

2、成员属性
private static class Generation {
        boolean broken = false;
    }

    
    private final ReentrantLock lock = new ReentrantLock();
    
    private final Condition trip = lock.newCondition();
    
    private final int parties;
    
    private final Runnable barrierCommand;
    
    private Generation generation = new Generation();
    //还有多少个线程在等待 会变
    private int count;
3、初始化方法
//生成一个新的 CyclicBarrier实例
private void nextGeneration() {
        // 最后一个到达的线程把所有等待的线程唤醒
        trip.signalAll();
        // 重置参与的线程
        count = parties;
        //新一代,相当于一个新的CyclicBarrier实例
        generation = new Generation();
    }

//打破栅栏,一般用于被中断时使用
private void breakBarrier() {
    // 设置状态 broken 为 true
    generation.broken = true;
    // 重置 count 为初始值 parties
    count = parties;
    // 唤醒所有已经在等待的线程
    trip.signalAll();
}
4、await()方法
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            //判断栅栏是否被打破   
            if (g.broken)
                throw new BrokenBarrierException();
            //判断是否被中断了
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //等待的线程数减1
            int index = --count;
            //最后一个线程到达了
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //指定了任务就执行任务
                    if (command != null)
                        command.run();
                    // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
                    ranAction = true;
                    //唤醒所有等待的线程然后重置CyclicBarrier
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                  //没设置超时就调用condition的await()方法
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                    //设置了超时就调用condition的awaitNanos()方法
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                         // 到这里,说明 g != generation, 说明新的一代已经产生,即最后一个线程 await 执行完成,
                    // 那么此时没有必要再抛出 InterruptedException 异常,即中断来晚了记录下来这个中断信息即可
                    // 或者是栅栏已经被打破了,那么也不应该抛出 InterruptedException 异常,
                    // 而是之后抛出 BrokenBarrierException 异常
                        Thread.currentThread().interrupt();
                    }
                }
              // 唤醒后,检查栅栏是否是“破的”
                if (g.broken)
                    throw new BrokenBarrierException();
// 这个 for 循环除了异常,就是要从这里退出了
            // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代
            // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的
            // 那什么时候不满足呢?barrierCommand 执行过程中抛出了异常,那么会执行打破栅栏操作,
            // 设置 broken 为true,然后唤醒这些线程。这些线程会从上面的 if (g.broken) 这个分支抛 BrokenBarrierException 异常返回
            // 当然,还有最后一种可能,那就是 await 超时,此种情况不会从上面的 if 分支异常返回,也不会从这里返回,会执行后面的代码
                if (g != generation)
                    return index;
              // 如果醒来发现超时了,打破栅栏,抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

await()方法的整体流程:
1、获取重入锁,加锁
2、等待线程数减1,如果减1后等待线程数为0了就执行指定的任务,然后唤醒条件队列的所有线程,即把条件队列的所有线程转移到同步队列里,然后重置CyclicBarrier
3、如果减1后等待线程数不为0,就把当前线程加入到条件队列等待唤醒
4、如果被唤醒,会判断是否是新一代线程,不是的话就返回是第几个到达的索引

总结:
1、CyclicBarrier不需要像CountDownLatch那样调用countDown()方法才表示任务完成了,而是调用await()方法就表示任务完成了
2、CyclicBarrier只能是多个线程同时到达才能继续运行,不能一个线程等待其他线程或其他线程等待一个线程,CountDownLatch可以一个线程等待其他线程或其他线程等待一个线程,因为CountDownLatch的阻塞和唤醒方法是分别由不同的线程调用的,CyclicBarrier的阻塞和唤醒方法是由同一批线程调用的
3、CyclicBarrier是基于Condition实现的,而Condition是基于AQS的独占锁实现的,CountDownLatch是基于AQS的共享锁实现的

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/606592.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号