图源:PHP中文网
新类库中的构件JavaSE 5 为并发添加了一些新的组件,这里简单介绍。
CountDownLatch在编写并发程序时,有时候我们会启动多个线程执行某项工作,而主线程需要等待这些线程完成后在继续执行或者退出。如果是Go或者其它语言,此时就必须使用sync.WaitGroup这样的组件去计数和等待。
关于Go的多线程和sync.WaitGroup,可以阅读Go语言编程笔记7:goroutine和通道 - 魔芋红茶’s blog (icexmoon.cn)。
实际上Java也有类似的组件——CountDownLatch。
“Latch”这个单词的意思是“门栓”或者“格栅”,所以很容易看出它的用途。
它的作用和sync.WaitGroup这类“线程计数器”类似,一般需要在线程结束执行后通过CountDownLatch.countDown来释放,而主线程的CountDownLatch.await调用会阻塞,直到计数器归零(也就是所有子线程退出)。和sync.WaitGroup这些计数器不同的是,CountDownLatch的计数只能初始化一次,之后只能调用CountDownLatch.countDown减少计数,而不能增加。
让我们看一个简单示例:
package ch23.countdown_latch;
import java.util.Random;
import java.util.concurrent.TimeUnit;
class CharacterPrinter implements Runnable {
private static char counter = 'a';
private static Random rand = new Random();
private char character = counter++;
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print(character + " ");
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
final int SIZE = 10;
Thread[] threads = new Thread[SIZE];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new CharacterPrinter());
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println();
System.out.println("all task is over.");
}
}
// c b d h i j a e g f
// all task is over.
这个示例中每个线程在休眠一段随机长度的时间后输出一个字母,主线程在所有子线程结束后打印一段话。
就像上面展示的,我们可以用Java学习笔记21:并发(1) - 魔芋红茶’s blog (icexmoon.cn)中介绍过的Thread.join来让主线程等待子线程。但需要注意的是,这样做有两个缺陷:
- 有性能损失。每个子线程结束的早晚是不同的,假设threads[0]是最晚结束的,那么主线程就要一直等待threads[0],然后才能依次等待其它线程。
- 必须持有线程的引用。
可以很容易地用CountDownLatch来改写这个示例:
package ch23.countdown_latch2;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class CharacterPrinter implements Runnable {
private static char counter = 'a';
private static Random rand = new Random();
private char character = counter++;
private CountDownLatch cdl;
public CharacterPrinter(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
try {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print(character + " ");
} finally {
cdl.countDown();
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
final int SIZE = 10;
ExecutorService es = Executors.newCachedThreadPool();
CountDownLatch cdl = new CountDownLatch(SIZE);
for (int i = 0; i < SIZE; i++) {
es.execute(new CharacterPrinter(cdl));
}
cdl.await();
System.out.println();
System.out.println("all task is over.");
es.shutdown();
}
}
// c b d h i j a e g f
// all task is over.
这里在子线程中我使用了try...finaly语句,并在finaly块中调用cdl.countDown()减少计数器,以确保任何情况下子线程退出都不会导致主线程因为计数器没有归零被无限阻塞。
CountDownLatch不能完全替代其它语言中的“线程计数器”,因为它只能初始化一个固定的值。有时候是没法在一开始确定等待线程的总数的,可能子线程会创建额外的子线程。
CyclicBarrier关于CountDownLatch的更多介绍可以阅读CountDownLatch (Java Platform SE 8 ) (oracle.com),官方文档提供了一个很有趣的示例,可以用额外的一组CountDownLatch来让子线程和主线程协同(实际上起到了一个类似Go中的信号通道的作用)。
通常会将CyclicBarrier和CountDownlatch用来比较,因为前者有一个CyclicBarrier.reset方法可以重置计数。但在我看来,这两个组件无论是用途还是使用方式,不能说是一模一样,也可以说是毫不相干。
“cyclic”单词的意思是“循环的”,“barrier”单词的意思是“格栅”或“栅栏”。
尽管这两者差别很大,你依然可以尝试用CyclicBarrier改写之前的示例并替换CountDownLatch,具体的代码可以阅读java-notebook/Main.java (github.com)。这里我不推荐这么做,所以不在这里展示代码。
关于CyclicBarrier的用途,《Thinking in Java》给出了一个有意思的示例:
package ch23.horse;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import util.Fmt;
class Horse implements Runnable {
private static Random rand = new Random();
private static int counter = 0;
private final int id = ++counter;
private int distance; // 当前马匹跑的距离
private CyclicBarrier cb;
public Horse(CyclicBarrier cb) {
this.cb = cb;
}
public synchronized int getDistance() {
return distance;
}
public synchronized void setDistance(int distance) {
this.distance = distance;
}
@Override
public void run() {
while (!Thread.interrupted()) {
setDistance(getDistance() + rand.nextInt(3) + 1);
try {
cb.await();
} catch (InterruptedException e) {
// e.printStackTrace();
} catch (BrokenBarrierException e) {
// e.printStackTrace();
}
}
}
@Override
public String toString() {
return Fmt.sprintf("Horse#%d", id);
}
}
class HorseRaceingTrack implements Runnable {
private final int length = 20; // 跑道长度
private List horse = new ArrayList<>();
private CyclicBarrier cb;
private ExecutorService es;
private class RaceingPrinter implements Runnable {
@Override
public void run() {
// 打印跑道
for (int i = 0; i < length; i++) {
System.out.print("=");
}
System.out.println();
// 打印马匹
boolean gameOver = false;
for (Horse horse : horse) {
int distance = horse.getDistance();
for (int i = 0; i < distance; i++) {
System.out.print("-");
}
System.out.println(horse);
if (distance >= length) {
gameOver = true;
}
}
if (gameOver) {
es.shutdownNow();
}
else{
cb.reset();
}
}
}
public HorseRaceingTrack(int horseNum) {
cb = new CyclicBarrier(horseNum, new RaceingPrinter());
for (int i = 0; i < horseNum; i++) {
horse.add(new Horse(cb));
}
es = Executors.newCachedThreadPool();
}
@Override
public void run() {
for (Horse horse : horse) {
es.execute(horse);
}
}
}
public class Main {
public static void main(String[] args) {
HorseRaceingTrack hrt = new HorseRaceingTrack(5);
new Thread(hrt){}.start();
}
}
这个示例可以看做是一个对“赛马游戏”的仿真,这里Horse表示一匹赛马,HorseRaceingTrack表示赛马场。
赛马持有一个CyclicBarrier引用,其run方法用一个随机数产生器来更新自己的奔跑距离,然后调用CyclicBarrier.await阻塞。CyclicBarrier.await调用后会让CyclicBarrier计数器减1,直到0的时候,所有通过CyclicBarrier.await阻塞的进程才能继续。
这就意味着所有的马匹在线程中更新距离后就等待,直到所有马匹都更新了距离。此时可以看做是回合制游戏中运行了一个回合,我们就可以通过其它程序来打印赛马的当前情况。
有意思的是CyclicBarrier支持两个参数的构造器,可以通过第二个参数指定一个任务(Runnable),该任务会在计数器归零时调用(此时其它等待线程依然阻塞,直到该任务执行结束),因此这里将打印程序RaceingPrinter作为这个任务是个不错的解决方案。
在打印程序中,分别打印赛道和马匹,如果有马匹已经跑到终点,就结束所有子线程(马匹所在线程)。如果游戏还没有结束,就调用CyclicBarrier.reset重置计数器以开始下一轮。此时通过CyclicBarrier.await阻塞的子线程会抛出并捕获一个BrokenBarrierException异常,在这里我们不做处理,直接让马匹继续下一轮“奔跑”。
DelayQueue关于CylicBarrier的更多内容可以阅读CyclicBarrier (Java Platform SE 8 ) (oracle.com)。
有时候我们需要执行一些“延迟执行的任务”,并且希望这些任务能按照延迟时间远近(任务急迫程度)来执行。此时就可以使用DelayQueue。
DelayQueue是一个按照延迟时间长短来排列的队列,延迟时间最短的对象被排列在最前边。DelayQueue中保存的元素都是Delayed接口或其子接口,可以通过该接口的getDelay方法获取剩余延迟时间。
通过DelayQueue.take从队列中获取元素的时候,该元素必须是到期了才能获取,否则就会阻塞。
看下边这个示例:
package ch23.delay_queue;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import util.Fmt;
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = ++counter;
private int delayed; // 推迟的毫秒数
private long trigger;
public DelayedTask(int delayed) {
this.delayed = delayed;
trigger = delayed + System.currentTimeMillis();
}
@Override
public void run() {
System.out.println(this);
}
@Override
public String toString() {
return Fmt.sprintf("task#%d(%dms)", id, delayed);
}
@Override
public int compareTo(Delayed o) {
DelayedTask other = (DelayedTask) o;
if (delayed > other.delayed) {
return 1;
} else if (delayed < other.delayed) {
return -1;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
public class Main {
public static void main(String[] args) {
Random random = new Random();
DelayQueue dq = new DelayQueue<>();
final int SIZE = 10;
for (int i = 0; i < SIZE; i++) {
dq.add(new DelayedTask(random.nextInt(10000)));
}
int counter = 0;
do {
try {
dq.take().run();
} catch (InterruptedException e) {
e.printStackTrace();
}
counter++;
if (counter >= SIZE) {
break;
}
} while (true);
}
}
// task#4(1167ms)
// task#8(1488ms)
// task#5(2531ms)
// task#9(2670ms)
// task#10(2969ms)
// task#6(3337ms)
// task#7(4334ms)
// task#1(4503ms)
// task#2(5532ms)
// task#3(8136ms)
这个示例中,DelayedTask代表一种延迟一段时间后执行的任务,通过构造器我们传入一个毫秒数,作为该任务的延迟时间。因为getDelay方法需要返回当前任务剩余的延迟时间。所以这里需要根据延迟的毫秒数和当前系统时间的毫秒数计算一个“任务触发时间”trigger。
主线程中产生一些随机长度延迟的任务,并添加进DelayQueue,并通过遍历的方式从DelayQueue获取任务并执行。
可以从结果看到,延迟短的任务被先取出执行,延迟长的任务后取出执行,执行顺序严格按照延迟时间的长短进行。并且从输出可以观察到,这些任务都要“等待”一会,延迟到期了才能得到执行。
PriorityBlockingQueue在介绍Java编程笔记9:容器(下) - 魔芋红茶’s blog (icexmoon.cn)的时候,介绍过优先级队列(PriorityQueue),优先级队列同样有多线程版本——PriorityBlockingQueue。
其用法和单线程版本的优先级队列是类似的,所以这里不做过多介绍,直接看示例:
package ch23.priority_queue; import java.util.PriorityQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import ch15.test2.RandomGenerator.StringGenerator; import ch23.priority_queue.Msg.Priority; import util.Enums; import util.Fmt; class Msg implements Comparable{ public enum Priority { HIGH, MEDDLE, LOW } private Priority p; private String message; public Msg(Priority p, String message) { this.p = p; this.message = message; } @Override public String toString() { return Fmt.sprintf("Msg(%s,%s)", p, message); } @Override public int compareTo(Msg o) { return this.p.compareTo(o.p); } } class MsgWriter implements Runnable { private PriorityQueue pq; private StringGenerator sg; private CountDownLatch cdl; public MsgWriter(PriorityQueue pq, CountDownLatch cdl) { this.pq = pq; this.sg = new StringGenerator(); this.cdl = cdl; } @Override public void run() { Priority p = Enums.random(Msg.Priority.class); pq.add(new Msg(p, sg.next())); cdl.countDown(); } } public class Main { public static void main(String[] args) throws InterruptedException { final int SIZE = 10; ExecutorService es = Executors.newCachedThreadPool(); PriorityQueue pq = new PriorityQueue<>(); CountDownLatch cdl = new CountDownLatch(SIZE); for (int i = 0; i < SIZE; i++) { es.execute(new MsgWriter(pq, cdl)); } cdl.await(); es.shutdown(); do { Msg msg = pq.poll(); if (msg == null) { break; } System.out.print(msg + " "); } while (true); } } // Msg(HIGH,xvnrf) Msg(HIGH,byouk) Msg(HIGH,eyxse) Msg(HIGH,timmp) // Msg(MEDDLE,olrto) Msg(LOW,dchge) Msg(LOW,nixcq)
这里Msg代表一种具备优先级的消息,MsgWriter用于模拟多个线程向优先级队列发送消息。主线程在等待子线程任务结束后,遍历优先级队列取出并打印消息。如果队列空了,PriorityQueue.poll就会返回一个null。
从最后的打印结果能看到,优先级队列中的消息的确是按照定义好的优先级顺序排列的。
ScheduledThreadPoolExecutor如果你使用过智能家居产品,一定对定时任务不陌生,比如目前我使用的空气净化器,我就添加了每天晚上9点启动和每天早上9点关闭的定时任务。
通过ScheduledThreadPoolExecutor这个类,可以在Java多线程编程中实现定时任务的功能:
package ch23.schedule;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class SmartTV {
@SuppressWarnings("unused")
private boolean on = false;
public synchronized void on() {
this.on = true;
System.out.println("tv is turn on.");
}
public synchronized void off() {
this.on = false;
System.out.println("tv is turn off.");
}
}
class TVOnTask implements Runnable {
private SmartTV tv;
public TVOnTask(SmartTV tv) {
this.tv = tv;
}
@Override
public void run() {
tv.on();
}
}
class TVOffTask implements Runnable {
private SmartTV tv;
public TVOffTask(SmartTV tv) {
this.tv = tv;
}
@Override
public void run() {
tv.off();
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
final int SIZE = 5;
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(SIZE);
SmartTV tv = new SmartTV();
executor.schedule(new TVOnTask(tv), 500, TimeUnit.MILLISECONDS);
executor.schedule(new TVOffTask(tv), 2, TimeUnit.SECONDS);
executor.shutdown();
}
}
// tv is turn on.
// tv is turn off.
在这个示例中,智能电视SmartTV将在主线程运行后0.5秒开启,2秒后自动关闭。
此外,ScheduledThreadPoolExecutor还支持设置间隔固定时间执行的任务:
package ch23.schedule2;
...
public class Main {
public static void main(String[] args) throws InterruptedException {
final int SIZE = 5;
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(SIZE);
SmartTV tv = new SmartTV();
executor.scheduleAtFixedRate(new TVOnTask(tv), 1, 5, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new TVOffTask(tv), 2, 5, TimeUnit.SECONDS);
executor.awaitTermination(20, TimeUnit.SECONDS);
executor.shutdown();
}
}
// tv is turn on.
// tv is turn off.
// tv is turn on.
// tv is turn off.
// tv is turn on.
// tv is turn off.
// tv is turn on.
// tv is turn off.
ScheduledThreadPoolExecutor.scheduleAtFixedRate可以用固定频率执行任务。scheduleAtFixedRate。await方法可以阻塞主线程,直到ScheduledThreadPoolExecutor的相关shutdown方法被调用,或者超时或产生中断。在这里起的作用和sleep是一样的。
Semaphore普通的锁只能允许一个线程取得,Semaphore允许一定数量的线程取得某种资源。
“Semaphore”这个单词可以被翻译为“信号量”。
《Thinking in Java》中用Semaphore实现了一种“对象池”:
package ch23.semaphore; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import util.Fmt; class ObjectPool{ private int size; private List objs; private Semaphore smp; @SuppressWarnings("unused") private Class cls; private boolean[] used; public ObjectPool(int size, Class cls) { this.size = size; this.cls = cls; objs = new ArrayList<>(); for (int i = 0; i < size; i++) { T obj; try { obj = cls.getDeclaredConstructor().newInstance(); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { throw new RuntimeException(e); } objs.add(obj); } smp = new Semaphore(size); used = new boolean[size]; for (int i = 0; i < size; i++) { used[i] = false; } } public T out() throws InterruptedException { smp.acquire(); T obj = get(); if (obj == null) { smp.release(); } return obj; } public void in(T obj) { if (back(obj)) { smp.release(); } } private synchronized T get() { for (int i = 0; i < size; i++) { if (!used[i]) { used[i] = true; return objs.get(i); } } return null; } private synchronized boolean back(T obj) { int index = objs.indexOf(obj); if (index < 0) { return false; } if (!used[index]) { return false; } used[index] = false; return true; } } class Student { private static int counter = 0; private final int id = ++counter; @Override public String toString() { return Fmt.sprintf("Student#%d", id); } } class OutTask implements Runnable { private ObjectPool pool; public OutTask(ObjectPool pool) { this.pool = pool; } @Override public void run() { try { T obj = pool.out(); System.out.println(obj + " is get."); } catch (InterruptedException e) { e.printStackTrace(); } } } public class Main { public static void main(String[] args) throws InterruptedException { final int SIZE = 5; ObjectPool pool = new ObjectPool<>(SIZE, Student.class); Student s = pool.out(); ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < SIZE; i++) { es.execute(new OutTask (pool)); } TimeUnit.SECONDS.sleep(2); pool.in(s); es.shutdown(); } } // Student#2 is get. // Student#5 is get. // Student#3 is get. // Student#4 is get. // Student#1 is get.
ObjectPool是一个可以持有指定数量对象的对象池,可以通过out方法取出对象,通过in方法归还对象。取出对象时使用Semaphore.acquire检查取出的对象是否已达到上限,如果超出指定容量,就会阻塞。直到有in方法被调用,并通过Semaphore.release释放信号量,阻塞的线程就会恢复。
OutTask任务只会从线程池中取出对象,不归还。
主线程中的测试代码先在主线程中取出一个对象,然后开启5个子线程执行OutTask任务,显然最后一个线程会在取出对象时被阻塞。主线程在休眠2秒后归还对象,此时那个阻塞的线程就能继续执行。
ExchangerExchanger是一个相当有趣的组件,可以用它在两个线程间交换数据:
package ch23.exchanger;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import util.Fmt;
class Meal {
private static int counter = 0;
private final int id = ++counter;
@Override
public String toString() {
return Fmt.sprintf("Meal#%d", id);
}
}
class ProducerTask implements Runnable {
private Exchanger> exchanger;
public ProducerTask(Exchanger> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
List meals = new CopyOnWriteArrayList<>();
while (!Thread.interrupted()) {
for (int i = 0; i < 5; i++) {
Meal meal = new Meal();
System.out.println(meal + " is cooked.");
meals.add(meal);
}
meals = exchanger.exchange(meals);
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
// System.out.println("");
}
}
}
class ConsumerTask implements Runnable {
private Exchanger> exchanger;
public ConsumerTask(Exchanger> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
List meals = new CopyOnWriteArrayList<>();
try {
while (!Thread.interrupted()) {
meals = exchanger.exchange(meals);
for (Meal meal : meals) {
System.out.println(meal + " is eated.");
meals.remove(meal);
}
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Exchanger> exchanger = new Exchanger<>();
es.execute(new ConsumerTask(exchanger));
es.execute(new ProducerTask(exchanger));
es.awaitTermination(5, TimeUnit.SECONDS);
es.shutdownNow();
}
}
上面这个示例中,ProducerTask一次生产5个菜品,然后调用Exchanger.exchange,请求和另外一个线程交换List
当然这个过程完全可以用两个通道处理,一个用于传送做好的食物给ConsumerTask,另个一用于传送空盘子给ProducerTask,但对于这个特定问题而言,使用Exchanger更加直观。
虽然这篇有点短,我还是决定先到这里了,谢谢阅读。
你可以从java-notebook (github.com)获取本篇文章的所有示例代码。
参考资料- DelayQueue详解 - myseries - 博客园 (cnblogs.com)



