parallelStream在Java8环境中如何使用

  介绍

这期内容当中小编将会给大家带来有关parallelStream在Java8环境中如何使用,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

<强>什么是流吗?

流是Java8中新增加的一个特性,被java猿统称为流。

流不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的迭代器。原始版本的迭代器,用户只能显式地一个一个遍历元素并对其执行某些操作,高级版本的流,用户只要给出需要对其包含的元素执行什么操作,比如“过滤掉长度大于10的字符串”,“获取每个字符串的首字母”等,小溪会隐式地在内部进行遍历,做出相应的数据转换。

流就如同一个迭代器(迭代器),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

而和迭代器又不同的是,流可以并行化操作,迭代器只能命令式地,串行化操作。顾名思义,当使用串行方式去遍历时,每读个项目完后再读下一个项目。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出.Stream的并行操作依赖于Java7中引入的Fork/Join框架(JSR166y)来拆分任务和加速处理过程. java的并行API演变历程基本如下:

1.0 - -1.4中的java.lang.Thread 5.0
中的java.util.concurrent 6.0
中Phasers等的,7.0
中的Fork/Join框架,8.0
中的Lambda ,

流的另外一大特点是,数据源本身可以是无限的。

parallelStream其实就是一个并行执行的流。它通过默认的ForkJoinPool,可能提高你的多线程任务的速度。

流具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作,因此像以下的程式片段:,

List数量=数组。asList (1, 2, 3, 4, 5, 6, 7, 8, 9);   numbers.parallelStream ()   .forEach (:: println);   

你得到的展示顺序不一定会是1,2,3,4,5,6,7,8,9,而可能是任意的顺序,就forEach()这个操作來讲,如果平行处理时,希望最后顺序是按照原来流的数据顺序,那可以调用forEachOrdered()。例:如,

List数量=数组。asList (1, 2, 3, 4, 5, 6, 7, 8, 9);   numbers.parallelStream ()   .forEachOrdered (:: println);   

注意:如果forEachOrdered()中间有其他如过滤()的中介操作,会试着平行化处理,然后最终forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()这类的有序处理,可能会(或完全失去)失去平行化的一些优势,实际上中介操作亦有可能如此,例如排序()方法。

要想深入的研究parallelStream之前,那么我们必须先了解ForkJoin框架和ForkJoinPool。本文旨在parallelStream,但因为两种关系甚密,故在此简单介绍一下ForkJoinPool,如有兴趣可以更深入的去了解下ForkJoin * * *(当然,如果你想真正的搞透parallelStream,那么你依然需要先搞透ForkJoinPool)。*

ForkJoin框架是从jdk7中新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

parallelStream在Java8环境中如何使用