JUC

Java并发工具包中的Fork/Join框架


ForkJoin是由JDK1.7后提供多线并发处理框架。ForkJoin的框架的基本思想是分而治之。ForkJoin框架其实就是一个线程池ExecutorService的实现,通过工作窃取(work-stealing)算法,获取其他线程中未完成的任务来执行。可以充分利用机器的多处理器优势,利用空闲的线程去并行快速完成一个可拆分为小任务的大任务,类似于分治算法。ForkJoin的目标,就是利用所有可用的处理能力来提高程序的响应和性能。

基础特性

ForkJoin框架的核心是ForkJoinPool类,基于AbstractExecutorService扩展。ForkJoinPool中维护了一个队列数组WorkQueue[],每个WorkQueue维护一个ForkJoinTask数组和当前工作线程。ForkJoinPool实现了工作窃取(work-stealing)算法并执行ForkJoinTask。


案例使用

计算从1-100的和,使用Fork-Join分而治之

public class ForkJoinDemo {
    class CountRecursiveTask extends RecursiveTask<Integer> {
        //达到子任务直接计算的阈值
        private  final int Th = 10;

        private int start;
        private int end;

        public CountRecursiveTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {

            if (end - start <= Th) {
                return count();//如果小于阈值,直接调用最小任务的计算方法
            } else {  //如果仍大于阈值,则继续拆分为2个子任务,分别调用fork方法。
                int middle = (end + start) / 2;
                System.out.println("start:" + start + ";middle:" + middle + ";end:" + end);
                CountRecursiveTask left = new CountRecursiveTask(start, middle);//拆分左边
                left.fork();
                CountRecursiveTask right = new CountRecursiveTask(middle + 1, end);//拆分右边
                right.fork();
                return right.join() + left.join();
            }


        }

        private int count() {
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
    }


    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

        Integer sum = forkJoinPool.invoke(new ForkJoinDemo().new CountRecursiveTask(1, 100));
        System.out.println(sum);

    }
}
start:1;middle:50;end:100
start:1;middle:25;end:50
start:51;middle:75;end:100
start:1;middle:13;end:25
start:1;middle:7;end:13
start:14;middle:19;end:25
start:51;middle:63;end:75
start:51;middle:57;end:63
start:64;middle:69;end:75
start:76;middle:88;end:100
start:76;middle:82;end:88
start:89;middle:94;end:100
start:26;middle:38;end:50
start:26;middle:32;end:38
start:39;middle:44;end:50
5050
要注意的是两个任务都 fork 的情况,必须按照 f1.fork(),f2.fork(), f2.join(),f1.join() 这样的顺序,不然有性能问题。JDK官方文档有说明,有兴趣的可以去研究下。

有一点需要注意,Fork-Join框架只是在同时使用多核运行原来的单核程序,每个核心上的程序之间互不交叉。这不会改变算法原来的时间复杂度。比如查询一个超大数组中的最小值,复杂度为O(n),通过Fork-Join框架,将数组分为两部分,递归的查询每部分的最小值,这只是将原来的所有操作用两个核心运行,时间复杂度为O(n/2),其实依然是O(n)。所以,假如排序一亿个数,单核用时30s,双核的用时也不会少于15s,因此,想要通过Fork-Join指数级提升性能是不现实的。但是对于已经成熟的系统,每一个微小部分的提升都有助于提升系统整体性能,从这个角度讲,Fork-Join还是有极大的用处。

 


工作顺序图

下图展示了以上代码的工作过程概要,但实际上Fork/Join框架的内部工作过程要比这张图复杂得多,例如如何决定某一个recursive task是使用哪条线程进行运行;再例如如何决定当一个任务/子任务提交到Fork/Join框架内部后,是创建一个新的线程去运行还是让它进行队列等待。

所以如果不深入理解Fork/Join框架的运行原理,只是根据之上最简单的使用例子观察运行效果,那么我们只能知道子任务在Fork/Join框架中被拆分得足够小后,并且其内部使用多线程并行完成这些小任务的计算后再进行结果向上的合并动作,最终形成顶层结果。我们先从这张概要的过程图开始讨论。

图中最顶层的任务使用submit方式被提交到Fork/Join框架中,后者将前者放入到某个线程中运行,工作任务中的compute方法的代码开始对这个任务T1进行分析。如果当前任务需要累加的数字范围过大(代码中设定的是大于200),则将这个计算任务拆分成两个子任务(T1.1和T1.2),每个子任务各自负责计算一半的数据累加,请参见代码中的fork方法。如果当前子任务中需要累加的数字范围足够小(小于等于200),就进行累加然后返回到上层任务中。


 ForkJoinPool构造函数

ForkJoinPool有四个构造函数,其中参数最全的那个构造函数如下所示:

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode)
  • parallelism:可并行级别,Fork/Join框架将依据这个并行级别的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理,但是千万不要将这个属性理解成Fork/Join框架中最多存在的线程数量,也不要将这个属性和ThreadPoolExecutor线程池中的corePoolSize、maximumPoolSize属性进行比较,因为ForkJoinPool的组织结构和工作方式与后者完全不一样。而后续的讨论中,读者还可以发现Fork/Join框架中可存在的线程数量和这个参数值的关系并不是绝对的关联(有依据但并不全由它决定)。

  • factory:当Fork/Join框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread的方法。在Fork/Join框架中有一个默认的ForkJoinWorkerThreadFactory接口实现:DefaultForkJoinWorkerThreadFactory。

  • handler:异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获。

  • asyncMode:这个参数也非常重要,从字面意思来看是指的异步模式,它并不是说Fork/Join框架是采用同步模式还是采用异步模式工作。Fork/Join框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即是说存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式。(当asyncMode设置为ture的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为false)

ForkJoinPool还有另外两个构造函数,一个构造函数只带有parallelism参数,既是可以设定Fork/Join框架的最大并行任务数量;另一个构造函数则不带有任何参数,对于最大并行任务数量也只是一个默认值——当前操作系统可以使用的CPU内核数量(Runtime.getRuntime().availableProcessors())。实际上ForkJoinPool还有一个私有的、原生构造函数,之上提到的三个构造函数都是对这个私有的、原生构造函数的调用。

如果你对Fork/Join框架没有特定的执行要求,可以直接使用不带有任何参数的构造函数。也就是说推荐基于当前操作系统可以使用的CPU内核数作为Fork/Join框架内最大并行任务数量,这样可以保证CPU在处理并行任务时,尽量少发生任务线程间的运行状态切换(实际上单个CPU内核上的线程间状态切换基本上无法避免,因为操作系统同时运行多个线程和多个进程)。


 fork方法和join方法

Fork/Join框架中提供的fork方法和join方法,可以说是该框架中提供的最重要的两个方法,它们和parallelism“可并行任务数量”配合工作,可以导致拆分的子任务T1.1、T1.2甚至TX在Fork/Join框架中不同的运行效果。例如TX子任务或等待其它已存在的线程运行关联的子任务,或在运行TX的线程中“递归”执行其它任务,又或者启动一个新的线程运行子任务……

fork方法用于将新创建的子任务放入当前线程的work queue队列中,Fork/Join框架将根据当前正在并发执行ForkJoinTask任务的ForkJoinWorkerThread线程状态,决定是让这个任务在队列中等待,还是创建一个新的ForkJoinWorkerThread线程运行它,又或者是唤起其它正在等待任务的ForkJoinWorkerThread线程运行它。

这里面有几个元素概念需要注意,ForkJoinTask任务是一种能在Fork/Join框架中运行的特定任务,也只有这种类型的任务可以在Fork/Join框架中被拆分运行和合并运行。ForkJoinWorkerThread线程是一种在Fork/Join框架中运行的特性线程,它除了具有普通线程的特性外,最主要的特点是每一个ForkJoinWorkerThread线程都具有一个独立的任务等待队列(work queue),这个任务队列用于存储在本线程中被拆分的若干子任务。

join方法用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。或者,如果这个子任务存在于当前线程的任务等待队列(work queue)中,则取出这个子任务进行“递归”执行。其目的是尽快得到当前子任务的运行结果,然后继续执行。


点击查看原文

  • 作者:低调做个路人 (扫码联系作者)
  • 发表时间:2019-12-07 02:46:25
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 评论

    追风筝的人
    阿萨大
    追风筝的人
    阿达撒答
    追风筝的人
    阿萨大