栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

011Java并发包012辅助类

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

011Java并发包012辅助类

1 CountDownLatch 1.1 简介

一个同步辅助类,用于将线程阻塞某段时间,等其他线程完成后,唤醒被阻塞的线程继续执行。

CountDownLatch在内部维护了一个计数器,需要在构造方法中传入一个非负整数值。

在线程中通过countDown()方法将计数器减一,当线程调用了await()方法后,会判断计数器是否为0,如果不为0会阻塞当前线程,等计数器为0后继续执行。

在当前计数到达零之前,等待线程会一直阻塞,计数为零时会释放所有等待线程。并且计数器无法被重置,因此只能使用一次,不能重复使用。

底层使用LockSupport用于阻塞和唤醒线程。

1.2 源码分析 1.2.1 构造方法

构造方法需要传入一个非负整数,否则会抛出异常。

通过构造方法将传入的值,设置为AQS中state字段的值。

// CountDownLatch的构造方法
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
...
// Sync继承自AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    ...
    // Sync的构造方法
    Sync(int count) {
        setState(count);
    }
    ...
}
...
// AQS类的setState()方法
protected final void setState(int newState) {
    state = newState;
}
1.2.2 await()方法

调用await()方法时,线程会阻塞,直到计数器的值变为0时,才会继续执行。

通过await()方法,判断AQS中state字段的值是否为0,若为0则不被阻塞,若不为0则阻塞入列当前线程。

// CountDownLatch的await()方法
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
...
// AQS的acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
...
// Sync的tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
1.2.3 countDown()方法

调用countDown()方法时,计数器数值会减1,当计数器被减为0时,唤醒被阻塞的线程。

在countDown()方法中,会对AQS中state字段的值进行双层判断:

1)在减少计数前,判断AQS中state字段的值是否为0,为0则返回false,不为0则减1并继续判断。

2)在减少计数后,判断AQS中state字段的值是否为0,为0则返回true,不为0则返回false。

只有在减少后判断等于0的时候才会唤醒等待线程,如果在减少前判断等于0则表示已经有其他线程唤醒过了,如果在减少后判断大于0则表示不能唤醒。

// CountDownLatch的countDown()方法
public void countDown() {
    sync.releaseShared(1);
}
// AQS的releaseShared()方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
// Sync的tryReleaseShared()方法
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
1.3 使用举例
public static void main(String[] args) throws InterruptedException {
    CountDownLatch cdl = new CountDownLatch(3);
    for (int i = 0; i < cdl.getCount(); i++) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "离开课堂");
            cdl.countDown();
        }, i + "号").start();
    }
    cdl.await();
    System.out.println("所有学生都已离开课堂");
}
2 CyclicBarrier 2.1 简介

一个同步辅助类,用于将线程阻塞某段时间,等其他线程也被阻塞后,唤醒被阻塞的线程,同时由最后阻塞的线程执行指定操作。

CyclicBarrier在内部维护了一个计数器和一个屏障操作,需要在构造方法中传入一个正整数值和一个屏障方法。

在线程中通过await()方法将计数器减一,判断计数器是否等于0,如果不等于0则将线程阻塞,如果等于0则唤醒等待线程,并由当前线程执行屏障方法。

底层使用Lock用于阻塞和唤醒线程。

2.2 源码分析 2.2.1 构造方法

构造方法有两个,最终调用的是同一个。

要求传入一个整型的数量,以及一个屏障操作,当等待的线程达到指定数量时,由最后等待的线程执行屏障操作。

// CyclicBarrier的构造方法,指定计数器和屏障方法
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
// CyclicBarrier的构造方法,指定计数器,默认屏障方法为空
public CyclicBarrier(int parties) {
    this(parties, null);
}
2.2.2 await()方法

调用await()方法后,线程会进入等待状态,直到达到指定数量后,执行屏障操作。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
2.3 使用举例
public static void main(String[] args) {
    CyclicBarrier cb = new CyclicBarrier(7, () -> {
        System.out.println(Thread.currentThread().getName() +  "龙珠是最后一颗,龙珠集齐,召唤神龙");
    });
    for (int i = 1; i <= 7; i++) {
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "龙珠被收集");
                cb.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, String.valueOf(i)).start();
    }
}
3 Semaphore 3.1 简介

一个同步辅助类,可以看做是一个计数信号量,信号量维护了一个许可集。

线程调用acquire()方法时,会尝试获取许可,即将信号量减一并判断是否大于等于0,获取失败则阻塞并等待,直到有信号量释放获取成功后才能继续执行。

线程调用release()方法会释放许可,将信号量加一,并唤醒被阻塞的线程。

底层使用LockSupport用于阻塞和唤醒线程。

3.2 源码分析 3.2.1 构造方法

构造方法有两个,都需要传入整型的许可数量,第一个使用非公平的锁,第二个可以传入参数指定使用公平锁还是非公平锁。

// Semaphore类的非公平锁的构造方法
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// Semaphore类的公平锁的构造方法
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
...
// 非公平锁和公平锁都继承自Sync,Sync继承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    Sync(int permits) {
        setState(permits);
    }
    ...
}
...
// AQS类的setState()方法
protected final void setState(int newState) {
    state = newState;
}
3.2.2 acquire()方法

调用acquire()方法会尝试获取许可,获取不到会一直阻塞。

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
3.2.3 release()方法

调用release()方法会释放许可,唤醒被阻塞的线程。

public void release() {
    sync.releaseShared(1);
}
3.3 使用举例
public static void main(String[] args) {
    Semaphore s = new Semaphore(3);
    for (int i = 1; i <= 6; i++) {
        new Thread(() -> {
            try {
                s.acquire();
                System.out.println(Thread.currentThread().getName() + "抢到了");
                Thread.sleep(100 * new Random().nextInt(5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(Thread.currentThread().getName() + "释放了");
                s.release();
            }
        }, String.valueOf(i)).start();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/322735.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号