栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

浅析Fork/Join

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

浅析Fork/Join

文章目录

一、算法

1、二分法2、分治法 二、实践

举个例子实现原理

1)类关系2)主要方法

compute()fork()join() 三、总结

一、算法 1、二分法

假设我们遇到一个规模很大的问题,很难一下子解决。那么要怎么处理呢?

思路一:能不能把这个大规模的问题变为每次减少一点呢?比如从n到n-1。
思路二:既然可以每次减少一点,那能不能每次减少几倍?效果会不会更好?


这里我们介绍思路二中的一个比较有名的算法——二分查找(也叫折半查找)。

具体实现思路:在规模较大的问题n中,查找目标值a,每次折半,然后看目标数值a所在的区间,然后继续折半,直到匹配出具体的值。

2、分治法

思路:把一个规模为n的问题,一直对半拆分,拆到可以求解为止,然后将结果整合起来。

二、实践

fork/join框架

以分治的思想使用线程

fork/join框架是java7提供的用于并行执行任务的框架。

举个例子
public class ForkJoinTestDemo extends RecursiveTask {
    public long start;
    public long end;
    
    public ForkJoinTestDemo(){}
    public ForkJoinTestDemo(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if((end-start)==0){
            System.out.println("fork end,start join:"+end);
            return end;
        }else {
            long mid=(start+end)/2;
            log.info("start="+start+",end="+end+",mid="+mid);
            ForkJoinTestDemo f1=new ForkJoinTestDemo(start,mid);
            f1.fork();
            ForkJoinTestDemo f2=new ForkJoinTestDemo(mid+1,end);
            f2.fork();
            return f1.join()+f2.join();
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool=new ForkJoinPool();
        //代码示例:求1~8数字之和。
        System.out.println(pool.invoke(new ForkJoinTestDemo(1,8)));
    }

}

打印结果:

start=1,end=8,mid=4
start=5,end=8,mid=6
start=1,end=4,mid=2
start=5,end=6,mid=5
start=1,end=2,mid=1
start=7,end=8,mid=7
fork end,start join:1
fork end,start join:2
fork end,start join:8
start=3,end=4,mid=3
fork end,start join:3
fork end,start join:4
fork end,start join:5
fork end,start join:6
fork end,start join:7
36

可以看出,分为两个步骤:fork和join。fork阶段做问题拆分,join整合结果。

实现原理 1)类关系

上面的例子中,我们创建的ForkJoinTestDemo类,继承自RecursiveTask。关系图如下:

Future:提供了可以获取异步执行的返回值的接口。
ForkJoinTask: 提供在任务中执行fork()和join()操作的机制。主要负责在 ForkJoinWorkerThread 和 ForkJoinPool 方法的中维护其“状态”字段。
RecursiveTask:提供一个带有结果的递归调用。
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

2)主要方法 compute()

在RecursiveTask中,实现了ForkJoinTask的exec(),在exec()中执行了compute()。并把计算的结果保存下来。

执行任务,成功时返回true。

public abstract class RecursiveTask extends ForkJoinTask {

	V result;
	protected final boolean exec() {
	        result = compute();
	        return true;
	    }
}

public abstract class ForkJoinTask implements Future, Serializable {
    protected abstract boolean exec();
}
fork()

判断是不是一个 ForkJoinWorkerThread 的工作线程,如果是,则将任务加入到内部队列中,否则,由 ForkJoinPool 提供的内部公用的线程池 common 线程池 来执行这个任务。

public final ForkJoinTask fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
join()
 public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }


private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
//当给定任务位于顶部时,弹出该任务。
 final boolean tryUnpush(ForkJoinTask t) {
            ForkJoinTask[] a; int s;
            if ((a = array) != null && (s = top) != base &&
                U.compareAndSwapObject
                (a, (((a.length - 1) & --s) << ASHIFT) + Abase, t, null)) {
                U.putOrderedInt(this, QTOP, s);
                return true;
            }
            return false;
        }
//任务窃取的主要执行方法。调用 exec 并记录状态。
//其中,上面提到过,我们对exec()重写,自定义处理规则。
final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

//当给定任务不在顶部时,帮助和或阻塞,直到给定任务完成或超时。
   final int awaitJoin(WorkQueue w, ForkJoinTask task, long deadline) {
        int s = 0;
        if (task != null && w != null) {
            ForkJoinTask prevJoin = w.currentJoin;
            U.putOrderedObject(w, QCURRENTJOIN, task);
            CountedCompleter cc = (task instanceof CountedCompleter) ?
                (CountedCompleter)task : null;
            for (;;) {
            	//任务完成,跳出循环
                if ((s = task.status) < 0)
                    break;
                if (cc != null)
   //当前任务是CountedCompleter类型,则尝试从任务队列中获取当前任务的派生子任务来执行完成任务
                    helpComplete(w, cc, 0);
                else if (w.base == w.top || w.tryRemoveAndExec(task))
         //窃取任务,如果当前线程的内部队列为空,或者成功完成了任务,帮助某个线程完成任务。
                    helpStealer(w, task);
                //任务完成,跳出循环
                if ((s = task.status) < 0)
                    break;
                long ms, ns;
                if (deadline == 0L)
                    ms = 0L;
                else if ((ns = deadline - System.nanoTime()) <= 0L)
                    break;
                else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                    ms = 1L;
                if (tryCompensate(w)) {
                    task.internalWait(ms);
                    U.getAndAddLong(this, CTL, AC_UNIT);
                }
            }
            U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
        }
        return s;
    }
三、总结

fork/join框架是使用分治法的思想,来合理利用线程。fork/join框架作为并发框架,与单一线程执行相比不一定会快,要做好相关压测和调优。使用“工作窃取”的方式,优化多线程下的任务执行。使用Unsafe直接操作实现原子性。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/755872.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号