一、算法
1、二分法2、分治法 二、实践
举个例子实现原理
1)类关系2)主要方法
compute()fork()join() 三、总结
一、算法 1、二分法假设我们遇到一个规模很大的问题,很难一下子解决。那么要怎么处理呢?
思路一:能不能把这个大规模的问题变为每次减少一点呢?比如从n到n-1。
思路二:既然可以每次减少一点,那能不能每次减少几倍?效果会不会更好?
这里我们介绍思路二中的一个比较有名的算法——二分查找(也叫折半查找)。
具体实现思路:在规模较大的问题n中,查找目标值a,每次折半,然后看目标数值a所在的区间,然后继续折半,直到匹配出具体的值。
2、分治法二、实践思路:把一个规模为n的问题,一直对半拆分,拆到可以求解为止,然后将结果整合起来。
fork/join框架
以分治的思想使用线程
fork/join框架是java7提供的用于并行执行任务的框架。
举个例子public class ForkJoinTestDemo extends RecursiveTask{ public long start; public long end; public ForkJoinTestDemo(){} public ForkJoinTestDemo(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { if((end-start)==0){ System.out.println("fork end,start join:"+end); return end; }else { long mid=(start+end)/2; log.info("start="+start+",end="+end+",mid="+mid); ForkJoinTestDemo f1=new ForkJoinTestDemo(start,mid); f1.fork(); ForkJoinTestDemo f2=new ForkJoinTestDemo(mid+1,end); f2.fork(); return f1.join()+f2.join(); } } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); //代码示例:求1~8数字之和。 System.out.println(pool.invoke(new ForkJoinTestDemo(1,8))); } }
打印结果:
start=1,end=8,mid=4 start=5,end=8,mid=6 start=1,end=4,mid=2 start=5,end=6,mid=5 start=1,end=2,mid=1 start=7,end=8,mid=7 fork end,start join:1 fork end,start join:2 fork end,start join:8 start=3,end=4,mid=3 fork end,start join:3 fork end,start join:4 fork end,start join:5 fork end,start join:6 fork end,start join:7 36
可以看出,分为两个步骤:fork和join。fork阶段做问题拆分,join整合结果。
实现原理 1)类关系上面的例子中,我们创建的ForkJoinTestDemo类,继承自RecursiveTask。关系图如下:
Future:提供了可以获取异步执行的返回值的接口。
ForkJoinTask: 提供在任务中执行fork()和join()操作的机制。主要负责在 ForkJoinWorkerThread 和 ForkJoinPool 方法的中维护其“状态”字段。
RecursiveTask:提供一个带有结果的递归调用。
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
在RecursiveTask中,实现了ForkJoinTask的exec(),在exec()中执行了compute()。并把计算的结果保存下来。
执行任务,成功时返回true。
public abstract class RecursiveTaskfork()extends ForkJoinTask { V result; protected final boolean exec() { result = compute(); return true; } } public abstract class ForkJoinTask implements Future , Serializable { protected abstract boolean exec(); }
判断是不是一个 ForkJoinWorkerThread 的工作线程,如果是,则将任务加入到内部队列中,否则,由 ForkJoinPool 提供的内部公用的线程池 common 线程池 来执行这个任务。
public final ForkJoinTaskjoin()fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
//当给定任务位于顶部时,弹出该任务。
final boolean tryUnpush(ForkJoinTask> t) {
ForkJoinTask>[] a; int s;
if ((a = array) != null && (s = top) != base &&
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + Abase, t, null)) {
U.putOrderedInt(this, QTOP, s);
return true;
}
return false;
}
//任务窃取的主要执行方法。调用 exec 并记录状态。
//其中,上面提到过,我们对exec()重写,自定义处理规则。
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
//当给定任务不在顶部时,帮助和或阻塞,直到给定任务完成或超时。
final int awaitJoin(WorkQueue w, ForkJoinTask> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter> cc = (task instanceof CountedCompleter) ?
(CountedCompleter>)task : null;
for (;;) {
//任务完成,跳出循环
if ((s = task.status) < 0)
break;
if (cc != null)
//当前任务是CountedCompleter类型,则尝试从任务队列中获取当前任务的派生子任务来执行完成任务
helpComplete(w, cc, 0);
else if (w.base == w.top || w.tryRemoveAndExec(task))
//窃取任务,如果当前线程的内部队列为空,或者成功完成了任务,帮助某个线程完成任务。
helpStealer(w, task);
//任务完成,跳出循环
if ((s = task.status) < 0)
break;
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
三、总结
fork/join框架是使用分治法的思想,来合理利用线程。fork/join框架作为并发框架,与单一线程执行相比不一定会快,要做好相关压测和调优。使用“工作窃取”的方式,优化多线程下的任务执行。使用Unsafe直接操作实现原子性。



