- 思维导图
- 1 任务和任务策略间的隐形耦合
- 1.1 线程饥饿死锁
- 1.2 耗时操作
- 2 定制线程池的大小
- 3 配置ThreadPoolExecutor
- 3.1 线程的创建和销毁
- 3.2 管理队列任务
- 3.3 饱和策略
- 3.4 线程工厂
- 4 扩展ThreadPoolExecutor
- 4.1 示例:给线程池添加统计统计信息
- 5 并行递归算法
- 5.1 谜底框架
- 5.1.1 顺序解决方案
- 5.1.2 并发解决方案
- 5.1.3 并发方案改进
- 参考文献
有些任务需要明确指定一个执行策略如:
- 依赖性任务:如果需要执行的任务依赖其它任务,这些任务则会有隐形的约束。
- 采用线程限制的任务:任务需要它们的Executor确保单线程化。
- 对响应时间敏感的任务:不应该提交到不合适的线程池,比如只有少量线程处理,任务长期得不到响应。
- 使用ThreadLocal的线程:由于线程池线程是公用的,在线程池中,不应该使用ThreadLocal传递属性.
1.1 线程饥饿死锁依赖性的任务需要足够大的线程池;采用线程限制的任务需要顺序执行。应该把这些需求都写入文档。
如果一个任务依赖其它任务,可能产生死锁。
比如下列demo:
public class ThreadDeadLock {
private static ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MICROSECONDS, new LinkedBlockingQueue());
public static class RenderPageTask implements Callable {
@Override
public String call() throws Exception {
//获取页眉任务
Future headerFuture = executorService.submit(() -> "header");
Future footerFuture = executorService.submit(() -> "footer");
return headerFuture.get()+ "mainContent" + footerFuture.get();
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Future submit = ThreadDeadLock.executorService.submit(new RenderPageTask());
System.out.println(submit.get());
executorService.shutdown();
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
由于设置了单线程的线程池,提交的任务首先执行,等待页眉页脚任务执行结束,但是任务中提交的页眉,页脚任务将会被添加到队列中等待线程空闲。这一过程造成了死锁。
1.2 耗时操作如果任务由于过长的执行时间阻塞,即使不死锁,也会响应性很差。
2 定制线程池的大小通过限制任务等待资源时间,而不是无限制等下去,可以缓解此类任务的影响。
线程池合理长度取决于提交的任务类型和当前系统。
不要硬编码,应该动态决定。
- 对于计算密集性任务,一个有N个处理器的系统通常使用N+1个线程的线程池获取最优计算率。
- 对于IO和包含其它阻塞操作的任务:
核心线程大小、最大线程数和存活时间共同管理线程的创建和销毁。
ThreadPoolExecutor通用构造器
对于核心线程如果不设置allowCoreThreadTimeOut=true 不会被回收。
3.2 管理队列任务ThreadPoolExecutor提供了一个BlockQueue来持有等待的任务。一般有:无限队列、有限队列和同步移交。
如下单线程池:
默认使用了链表阻塞队列,是无限队列,任务饱和策略可能会失效,我们一般需要自定义队列为有限队列。
当一个有限队列满了之后,开始执行饱和策略
有以下几种饱和策略:
- CallerRunsPolicy:除非线程池已经关闭,否则在调用线程中执行该任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- AbortPolicy:抛出默认拒绝执行异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
- DiscardPolicy:什么也不执行,静默的丢弃任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy:线程池关闭直接丢弃,否则,移除最久的任务,重新执行该任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
当工作队列满了之后,没有预设的策略阻止execute,不过我们可以在外部限制,比如使用Semaphore信号量:
public class BoundsExecutor {
private ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
private Semaphore semaphore;
public BoundsExecutor(int bound) {
this.semaphore = new Semaphore(bound);
}
public void summitTask(final Runnable runnable) throws InterruptedException {
semaphore.acquire();
try {
executorService.execute(() -> {
try {
runnable.run();
} finally {
semaphore.release();
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
通过信号量预设提交任务限制,提交任务必须获取许可,否则只能等待其它任务执行完。
3.4 线程工厂有时需要定制线程工厂,比如我们需要设置未捕获异常处理器UncaughtExceptionHandler。
定制线程工厂只需要实现ThreadFactory即可:
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
return new myAppThread(r, poolName);
}
}
class myAppThread extends Thread {
private static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debug = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = java.util.logging.Logger.getAnonymousLogger();
public myAppThread(Runnable r, String poolName) {
super(r, poolName + "-" + created.getAndIncrement());
setUncaughtExceptionHandler((t, e) -> {
log.log(Level.SEVERE, "UNCAUGHT in thread" + t.getName(), e);
});
}
@Override
public void run() {
boolean tag = debug;
if (tag) {
log.log(Level.FINE, "Created:" + getName());
}
try {
alive.getAndIncrement();
super.run();
} finally {
alive.getAndDecrement();
if (tag) {
log.log(Level.FINE, "Exiting:" + getName());
}
}
}
}
如上可以设置线程池名字,并为创建的线程设置未捕获异常处理器,同时添加了统计创建的线程数和存活的线程数。
4 扩展ThreadPoolExecutorThreadPoolExecutor的设计是可扩展的,提供了钩子函数:beforeExecute、afterExecute和terminated用于重写扩展线程池行为。
beforeExecute、afterExecute出现位置线程池runWorker:
terminated出现在线程池关闭:
示例demo:
public class TimingThreadPool extends ThreadPoolExecutor {
private final Logger log = Logger.getLogger(TimingThreadPool.class.toString());
private final ThreadLocal startTime = new ThreadLocal<>();
private final AtomicLong totalTime = new AtomicLong();
private final AtomicLong numTasks = new AtomicLong();
public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.info(String.format("Thread-%s, start task-%s", t, r));
startTime.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long end = System.nanoTime();
long taskTime = end - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.info(String.format("Thread %s : end %s, time=%dns", Thread.currentThread(), r, taskTime));
}finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
public static void main(String[] args) throws InterruptedException {
TimingThreadPool timingThreadPool = new TimingThreadPool(3, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new AbortPolicy());
for (int i = 0; i < 100; i++) {
timingThreadPool.submit(() -> {
System.out.println("ceshi");
});
}
timingThreadPool.shutdown();
timingThreadPool.awaitTermination(5L, TimeUnit.SECONDS);
}
}
这里主要是管理任务开始结束时间和最后线程池关闭时任务平均执行时间。
5 并行递归算法当每个迭代都是独立的,并且迭代工作足以弥补新任务的开销,可以考虑并行化处理。
如下就是将顺序递归转化为并行处理的示例:
public void sequentialRecursive(List nodeList, Collection resultList) {
for (Node node : nodeList) {
resultList.add(node.compute());
sequentialRecursive(node.getChilds(), resultList);
}
}
public void parallelRecursive(final ExecutorService executorService, List nodeList, Collection resultList) {
for (Node node : nodeList) {
executorService.execute(() -> {
resultList.add(node.compute());
});
parallelRecursive(executorService, node.getChilds(), resultList);
}
}
上述遍历的过程依然是顺序的,但是compute过程是并发的。
如下是对并行结果的获取:
public Collection getParallelResult(List nodeList) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
Queue result = new ConcurrentLinkedDeque<>();
parallelRecursive(executorService, nodeList, result);
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return result;
}
5.1 谜底框架
定义一个初始位置,一个目标位置,按照一系列规则找到移动到目标点的移动路径。(具体可参考JAVA并发编程实战8.5.1节)。
谜题抽象如下:
public interface Puzzle{ P initialPosition(); boolean isGoal(P position); Set
legalMoves(P positon); P move(P position, M move); }
Node代表一个位置经过一系列移动到达该位置:
public class Node5.1.1 顺序解决方案{ final P pos; final M mov; final Node
pre; public Node(P pos, M mov, Node
pre) { this.pos = pos; this.mov = mov; this.pre = pre; } List
asMoveList() { List result = new LinkedList<>(); for (Node node = this; node.mov != null; node = node.pre) { result.add(0, node.mov); } return result; } }
示例demo:
public class SequentialPuzzleSolver{ private final Puzzle
puzzle; private final Set
seen = new HashSet<>(); public SequentialPuzzleSolver(Puzzle
puzzle) { this.puzzle = puzzle; } public List
solver() { P pos = puzzle.initialPosition(); return search(new Node (pos, null, null)); } private List
search(Node node) { if (!seen.contains(node.pos)) { seen.add(node.pos); if (puzzle.isGoal(node.pos)) { return node.asMoveList(); } for (M legalMove : puzzle.legalMoves(node.pos)) { P movePos = puzzle.move(node.pos, legalMove); Node
pmNode = new Node
(movePos, legalMove, node); List
result = search(pmNode); if (result != null) { return result; } } } return null; } }
主要是通过顺序广度有限搜索进行判断是否符合目标要求。
5.1.2 并发解决方案从5.1.1可以看出这些迭代过程是独立的,因此考虑对代码做如下改进:
public class ParallelPuzzleSolver{ private final Puzzle
puzzle; private final ExecutorService executorService; private final ConcurrentHashMap
seen; final ValueLatch
> valueLatch = new ValueLatch<>(); public ParallelPuzzleSolver(Puzzle puzzle, ExecutorService executorService, ConcurrentHashMap
seen) { this.puzzle = puzzle; this.executorService = executorService; this.seen = seen; } public List
solver() throws InterruptedException { try { P pos = puzzle.initialPosition(); //提交任务 executorService.execute(newTask(pos, null, null)); Node result = valueLatch.getValue(); return result == null ? null : result.asMoveList(); } finally { executorService.shutdown(); } } protected Runnable newTask(P pos, M mov, Node
pre) { return new PuzzleTask(pos, mov, pre); } class PuzzleTask extends Node
implements Runnable { public PuzzleTask(P pos, M mov, Node
pre) { super(pos, mov, pre); } @Override public void run() { if (valueLatch.isSet() || seen.putIfAbsent(pos, true) == null) { return; } if (puzzle.isGoal(pos)) { valueLatch.setValue(this); } else { //并发处理 for (M legalMove : puzzle.legalMoves(pos)) { executorService.execute(newTask(puzzle.move(pos, legalMove), legalMove, this)); } } } } }
结果封装(利用闭锁传递搜索信息):
public class ValueLatch{ final CountDownLatch countDownLatch = new CountDownLatch(1); private T value = null; public boolean isSet() { return countDownLatch.getCount() == 0; } public synchronized void setValue(T val) { if (!isSet()) { value = val; countDownLatch.countDown(); } } public T getValue() throws InterruptedException { countDownLatch.await(); synchronized (this) { return value; } } }
主要是利用线程池,将每个搜索过程改为任务,提高了并发性。
5.1.3 并发方案改进上述并发解决方案,如果没有结果将会导致线程阻塞在countDownLatch.await(),因此需要对没有结果的情况进行考虑,改进如下:
public class PuzzleSolverextends ParallelPuzzleSolver
{ private final AtomicInteger taskCount = new AtomicInteger(0); public PuzzleSolver(Puzzle
puzzle, ExecutorService executorService, ConcurrentHashMap
seen) { super(puzzle, executorService, seen); } @Override protected Runnable newTask(P pos, M mov, Node
pre) { return super.newTask(pos, mov, pre); } class CountSolverTask extends PuzzleTask { public CountSolverTask(P pos, M mov, Node
pre) { super(pos, mov, pre); taskCount.incrementAndGet(); } @Override public void run() { try { super.run(); } finally { if (taskCount.decrementAndGet() == 0) { valueLatch.setValue(null); } } } } }
主要是利用taskCount统计开始的任务个数,当所有搜索任务都开始,并执行结束后taskCount为0,如果此时还没有返回,则说明没有结果,需要设置一个null值。
参考文献[1]. 《JAVA并发编程实战》.



