栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JDK8 CyclicBarrier源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JDK8 CyclicBarrier源码分析

一、作用

CyclicBarrier 循环屏障,类似于起跑线阻塞运动员,所有线程到达屏障后才可以解除阻塞。并且可以重复使用(不像CountDownLatch减到0就无法使用了)。

二、源码分析
public class CyclicBarrier {
    
    private static class Generation {
        boolean broken = false;
    }

    
    private final ReentrantLock lock = new ReentrantLock();
    
    private final Condition trip = lock.newCondition();
    
    private final int parties;
    
    private final Runnable barrierCommand;
    
    private Generation generation = new Generation();

    
    private int count;

    
    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        // 为什么要新建一个对象呢
        generation = new Generation();
    }

    
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 保存当前代
            final Generation g = generation;
            // 如果当前线程可以dowait,那么当前代不应该被破坏
            if (g.broken)
                throw new BrokenBarrierException();
            // 线程被中断的话,应该破坏当前屏障并唤醒所有线程
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // 剩余可阻塞线程数
            int index = --count;
            // 如果所有线程已经到达屏障
            if (index == 0) {  // tripped
                // 初始化标识
                boolean ranAction = false;
                try {
                    // 保存运行动作
                    final Runnable command = barrierCommand;
                    // 如果动作不为null
                    if (command != null)
                        // 动作运行
                        command.run();
                    // 设置标识为运行成功
                    ranAction = true;
                    // 唤醒所有等待线程,屏障修复,进入下一代
                    nextGeneration();
                    return 0;
                } finally {
                    // 假如动作抛出异常,则破坏当前屏障
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 循环直到通过屏障、被打破,中断、超时
            for (;;) {
                try {
                    // 如果没设置超时等待
                    if (!timed)
                        // 释放锁,进入条件等待队列
                        trip.await();
                    // 如果设置了等待时间并且nanos设置等待时间
                    else if (nanos > 0L)
                        // 等待指定时间
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 如果等于当前代(上面index为0并且进入nextGeneration时才会不等于当前代) 并且 屏障没被破坏
                    if (g == generation && ! g.broken) {
                        // 破坏当前屏障
                        breakBarrier();
                        throw ie;
                    } else { // 不等于当前代或者屏障被破坏了
                        // 即使没被中断也将要完成等待 todo
                        // 中断当前线程
                        Thread.currentThread().interrupt();
                    }
                }
                
                // 进入下一代的时候,屏障就已经修复了,所以这里不应该被打破
                // 如果被打破,抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                
                // 如果进入了下一代(调用了nextGeneration),则说明已经通过屏障,返回当前线程到达屏障时剩余的可阻塞数
                if (g != generation)
                    return index;
                // 如果设置了超时等待时间,并且nanos设置的不正确,则打破屏障并抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    
    public int getParties() {
        return parties;
    }

    
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    
    public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
            BrokenBarrierException,
            TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }


    
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 打破屏障(必须先打破屏障再进入下一代,否则导致并非所有线程到达屏障就通过)
            // 先打破会让在屏障等待的线程抛出异常
            breakBarrier();   
            // 进入下一代
            nextGeneration(); 
        } finally {
            lock.unlock();
        }
    }

    
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/862936.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号