0
点赞
收藏
分享

微信扫一扫

Java8的Stream流如此强大,你知道它的原理吗


📝​​亲录:【深入浅出版】Java全套学习路线规划及配套视频和笔记​​

简介:大家好,我是枫哥🌹,一线互联网的IT民工、📝资深面试官、🌹Java跳蚤网课堂创始人。拥有多年一线研发经验,曾就职过科大讯飞、美团网、平安等公司。在上海有自己小伙伴组建的副业团队,目前业余时间专注Java技术分享,春招、秋招、社招,跳槽,一对一学习辅助,项目接活开发。


目录

​​1.Stream的组成和特点。​​

​​2.BaseStream接口。​​

​​3.Stream接口。​​

​​4.关闭流操作。​​

​​5.并行流和串行流。​​

​​6.Work Stealing原理:​​

​​7.从ForkJoinPool的角度看ParallelStream。​​

​​小结:​​

​​8.并行流的性能。​​

​​9.NQ模型。​​

​​10.遇到顺序。​​

Java8API添加了一种新的抽象,称为流Stream,可以让你以声明的方式处理数据。

Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。

Stream API可以大大提高Java程序员的生产力,让程序员写出高效、干净、简洁的代码。

本文将分析Stream的实现原理。

1.Stream的组成和特点。

Stream(流)是一个来自数据源的元素队列,支持聚合操作:

  • 元素是一个特定类型的对象,形成一个队列。Java中的Stream不会像集合那样存储和管理元素,而是按需计算。
  • 数据源流的来源可以是集合Collection、数组Aray、I/Ochanel、生成器generator等。
  • 类似SQL语句的聚合操作,如filter、map、reduce、find、match、sorted等。

和以前的Collection操作不同, Stream操作还有两个基础的特征:

  • Pipelining:中间操作会返回流对象本身。这样多个操作就可以串联成一个管道,就像流式风格一样。这样可以优化操作,如延迟执行(laziness evaluation)和短路(short-circuiting)
  • 内部迭代:以前通过Iterator或For-Each对集合遍历进行显式迭代,称为外部迭代。Stream通过访问者模式(Visitor)提供内部迭代。

与迭代器不同的是,Stream可以并行操作,迭代器只能命令的和串行化操作。顾名思义,当使用串行方式遍历时,每个item读完再读下一个item。当使用并行遍历时,数据将分为多个段,每个段处理在不同的线程中,然后一起输出结果。

Stream的并行操作取决于Java7中引入的Fork/Join框架(JSR166y)来分割任务和加速处理过程。Java并行API的演变过程基本如下:

1.0-1.4 中的 java.lang.Thread

5.0 中的 java.util.concurrent

6.0 中的 Phasers 等

7.0 中的 Fork/Join 框架

8.0 中的 Lambda

Stream具有平行处理能力,处理过程将分为多个小任务,这意味着每个任务都是一个操作:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream()       .forEach(out::println);

可以看到一行简单的代码帮助我们实现并行输出集合元素的功能,但由于并行执行的顺序是不可控的,每次执行的结果可能不一样。

如果必须相同,可以使用forEachOrdered执行终止操作:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream()       .forEachOrdered(out::println);

这里有一个问题,如果结果需要有序,是否违背了我们并行执行的初衷?是的,在这种情况下,显然不需要使用并行流,可以直接使用串行流,否则性能可能会更差,因为所有并行结果最终都被迫排序。

好的,我们先介绍一下Stream接口的相关知识。

2.BaseStream接口。

Stream的父接口是BaseStream,后者是所有流实现的顶层接口,定义如下:

public interface BaseStream<T, S extends BaseStream<T, S>>        extends AutoCloseable {    Iterator<T> iterator();    Spliterator<T> spliterator();    boolean isParallel();    S sequential();    S parallel();    S unordered();    S onClose(Runnable closeHandler);    void close();}

其中,T是流中元素的类型,S是BaseStream的实现类,其中的元素也是T,S也是自己的:

S extends BaseStream<T, S>

是不是有点懵逼?

事实上,这很容易理解。让我们看看界面中S的使用:例如,sequential()和parallel()两个方法都返回了s实例,即支持当前流的串行或并行操作,并返回更改后的流对象。


如果是并行的,必须涉及当前流的拆分,即将一个流分为多个子流,子流必须与父流的类型一致。子流可以继续拆分子流并继续拆分

也就是说,这里的S是BaseStream的实现类,它也是一个流,如Stream、IntStream、LongStream等。

3.Stream接口。

来看下Stream的接口声明:

public interface Stream<T> extends BaseStream<T, Stream<T>>

这里不难理解的是,Stream可以继续分为Stream,我们可以通过它的一些方法来证实:

Stream<T> filter(Predicate<? super T> predicate);<R> Stream<R> map(Function<? super T, ? extends R> mapper);<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);Stream<T> sorted();Stream<T> peek(Consumer<? super T> action);Stream<T> limit(long maxSize);Stream<T> skip(long n);...

这些都是操作流的中间操作,其返回结果必须是流对象本身。

4.关闭流操作。

Basestream实现了Autocloseable接口,即close()方法在流关闭时被调用。与此同时,Basestream还为我们提供了Onclose()方法:

S onClose(Runnable closeHandler);

当调用AutoCloseable的close()接口时,会触发调用流对象的Onclose()方法,但有几点需要注意:

  • onclose()方法将返回流对象本身,即可多次调用该对象。
  • 若调用多种onClose()方法,则按调用顺序触发,但若某种方法有异常,则只会向上抛出第一种异常。
  • 前一种onClose()方法抛出异常,不影响后续onclose()方法的使用。
  • 若多种onClose()方法抛出异常,只显示第一个异常堆栈,而其它异常则被压缩,只显示部分信息。

5.并行流和串行流。

Basestream接口分别提供并行流和串行流两种方法。这两种方法可以任意调用多次或混合调用,但最终只能以最后一种方法调用的返回结果为准。

参考parallel()方法的说明:

Returns an equivalent stream that is parallel. May return

itself, either because the stream was already parallel, or because

the underlying stream state was modified to be parallel.

因此,同样的方法不会产生新的流,而是直接重用当前的流对象。

下面的例子里以最后一次调用parallel()为准,最终是并行地计算sum:

stream.parallel()   .filter(...)   .sequential()   .map(...)   .parallel()   .sum();

ForkJoin框架是JDK7的一个新特征。与ThreadPoolexecutor一样,它还实现了Executor和ExecutorService接口。它使用无限队列来保存要执行的任务,而线程的数量是通过构造函数传输的。如果所需的线程数量没有传输到构造函数中,当前计算机可用的CPU数量将被设置为线程数作为默认值。

ForkJoinPool主要用于分治法(Divide-and-conqueralgorithm)解决问题,典型的应用如_快速排序算法_。这里的要点是ForkJoinPool需要使用相对较少的线程来处理大量的任务。

例如,如果要对1000万个数据进行排序,将该任务分为两个500万排序任务和一个500万数据的合并任务。

以此类推,500万的数据也会被分割,最将设置一个阈值,以规定当数据规模达到多少时,将停止此类分割。例如,当元素数量小于10时,它们将停止分割,并使用插入排序对其进行排序。最后,所有任务加起来大约有2万+。

问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行,想象一下归并排序的过程。

因此,使用ThreadPolexecutor时,使用分治法会出现问题,因为ThreadPolexecutor中的线程无法在任务队列中添加任务,并在等待任务完成后继续执行。使用ForkJoinPool时,可以创建新的任务并悬挂当前任务。此时,线程可以从队列中选择子任务执行。

那么使用ThreadPolexecutor或ForkJoinPool会有什么性能差异呢?

首先,使用ForkJoinPool可以使用数量有限的线程来完成许多具有父子关系的任务,例如使用4个线程来完成超过200万个任务。使用ThreadPolexecutor是不可能的,因为ThreadPolexecutor中的Thread不能选择优先执行子任务。当需要完成200万个有父子关系的任务时,也需要200万个线程,这显然是不可行的。

6.Work Stealing原理:

  • 每个工作线程都有自己的工作队列WorkQueue;
  • 这是一个双端队列dequeue,是私有的线程;
  • ForkJoinTask中fork的子任务将放入运行任务的工作线程的队头,工作线程将按照LIFO的顺序处理工作队列中的任务,即堆栈;
  • 为了最大限度地利用CPU,空闲线程将从其他线程的队列中窃取任务。
  • 但是工作队列的尾部窃取任务,以减少与队列所属线程的竞争;
  • 双端队列操作:push()/pop()仅在其所有者的工作线程中调用,poll()是在其他线程窃取任务时调用的;
  • 当只剩下最后一个任务时,仍然会有通过CAS实现的竞争;

7.从ForkJoinPool的角度看ParallelStream。

Java8为ForkJoinPool添加了一个通用线程池,用于处理未显式提交给任何线程池的任务。它是ForkJoinPool类型的静态元素,默认线程数等于运行计算机上的CPU数。

调用Arrays类添加的新方法时,会发生自动并行化。

例如,用于排序数组的并行快速排序用于并行遍历数组中的元素。自动并行化也用于Java8新添加的StreamAPI。

例如,以下代码用于遍历列表中的元素并执行所需操作:

List<UserInfo> userInfoList =        DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);

列表中的元素的操作将并行执行。foreach方法将为每个元素的计算操作创建一个任务,该任务将由上述forkJoinPool中的comonPool处理。

当然,ThreadPolexecutor也可以完成上述并行计算逻辑,但就代码的可读性和数量而言,ForkJoinPool显然更好。

对于ForkJoinPool通用线程池的线程数量,通常使用默认值,即运行时计算机处理器的数量。也可以通过设置系统属性:-Djava.util.concurent.ForkJoinPool.common.parallelism=N(N为线程数)来调整ForkJoinPool的线程数量。

值得注意的是,当前执行的线程也将用于执行任务,因此最终线程数为N+1,1是当前的主线程

这里有一个问题,如果你在并行流的执行计算中使用阻塞操作,比如I/O,很可能会导致一些问题:

public static String query(String question) {  List<String> engines = new ArrayList<String>();  engines.add("http://www.google.com/?q=");  engines.add("http://duckduckgo.com/?q=");  engines.add("http://www.bing.com/search?q=");  // get element as soon as it is available  Optional<String> result = engines.stream().parallel().map((base) - {    String url = base + question;    // open connection and fetch the result    return WS.url(url).get();  }).findAny();  return result.get();}

这个例子很典型。我们来分析一下:

  • 这种并行流计算操作将由主线程和JVM默认的ForkJoinPool.commonPool()共同执行。
  • map是一种阻塞方法,需要访问HTTP接口并获得其response,因此任何worker线程在执行到此时都会被阻塞并等待结果。
  • 因此,当计算方法在其他地方并行调用时,会受到阻塞等待方法的影响。
  • 目前,ForkJoinPool的实现并没有考虑补偿等待阻塞等待新生成线程的工作worker线程,因此ForkJoinPool.comonPool()中的线程将备用并阻塞等待。

正如我们上面那个列子的情况分析得知,lambda的执行并不是瞬间完成的,所有使用parallel streams的程序都有可能成为阻塞程序的源头, 并且在执行过程中程序中的其他部分将无法访问这些workers,这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机。

小结:

  • 在需要处理递归分治算法时,考虑使用ForkJoinPool。
  • 仔细设置不再划分任务的阈值,这对性能有影响。
  • ForkJoinPool中的通用线程池将用于Java8中的一些特性。在某些情况下,需要调整线程池的默认线程数。
  • lambda应尽量避免副作用,即避免基于堆的状态和任何IO的突变。
  • lambda应该互不干扰,也就是说,避免修改数据源(因为这可能会导致线程安全)
  • 避免在流操作生命周期内可能改变的状态。

8.并行流的性能。

并行流框架的性能受以下因素影响:

  • 数据大小:数据足够大,每个管道处理时间足够长,并行有意义;
  • 源数据结构:每个管道操作都是基于初始数据源,通常是集合,不同的集合数据源分割会有一定的消耗;
  • 装箱:处理基本类型比装箱类型快;
  • 核数:默认情况下,核数越多,底层fork/join线程池启动线程越多;
  • 单元处理开销:每个元素花在流中的时间越长,并行操作带来的性能提升越明显;

源数据结构分为以下三组:

  • 性能好:ArrayList,数组或Intstream.range(数据支持随机读取,可轻松分割)
  • 性能一般:HashSet,TreeSet(数据不易公平分解,大部分也可以)
  • 性能差:LinkedList(需要遍历链表,半分解困难),Stream.itrate和Buferedreader.Lines(长度未知,难以分解)

9.NQ模型。

为了确定并行性是否会带来加速,最后两个因素需要考虑:可用数据量和每个数据元素的计算量。

在我们最初的并行分解描述中,我们采用的概念是拆分源,直到分段足够小,以便更有效地解决分段中的问题。分段的大小必须取决于解决的问题取决于每个元素的工作量。

例如,计算字符串长度所涉及的工作远低于计算字符串的SHA-1哈希值。每个元素完成的工作越多,足够使用并行性的阈值就越低。类似地,数据越多,分割就越多,不会与太小阈值发生冲突。

一个简单但有用的并行性能模型是NQ模型,其中N是数据元素的数量,Q是每个元素的工作量。乘积N*Q越大,获得并行加速的可能性就越大。对于Q很小的问题,比如数字求和,你通常希望看到N>10000以获得加速;随着Q的增加,获得加速所需的数据尺寸将会减小。

并行化的许多障碍(如拆分成本、组合成本或顺序敏感性)可以通过Q更高的操作来缓解。虽然拆分一个LinkedList特征的结果可能很差,但只要有足够大的Q,仍然可以获得并行加速。

10.遇到顺序。

遇到顺序是指来源分发元素的顺序是否对计算至关重要。一些来源(如基于哈希的集合和映射)没有有意义的遇到顺序。流标志ORDERED描述了流是否有意义。

JDK集合的spliterator将根据集合规范设置此标志;

一些中间操作可能注入 ORDERED (sorted()) 或清除它 (unordered())。

如果流没有遇到顺序,大多数流操作必须遵守顺序。对于顺序执行,会自动保留遇到的顺序,因为元素会按照遇到的顺序自然处理。

即使在并行执行中,许多操作(无状态中间操作和一些终止操作(如reduce())也不会按顺序产生任何实际成本。

但对于其他操作(有状态中间操作,其语义与遇到顺序相关的终止操作,如findfirst()或foreachordered()),在并行执行中遵守遇到顺序的责任可能非常重要。

如果流有一个定义的遇到顺序,但这个顺序对结果没有意义,可以通过使用unordered()操作删除ORDERED标志,加速包含顺序敏感操作的管道的顺序。

作为对顺序敏感的操作的示例,可以考虑limit(),它会在指定的大小处切断一个流。在顺序执行中实现limit()很简单:保留一个看到多少元素的计数器,然后丢弃任何元素。

但在并行执行中,实现limit()要复杂得多;您需要保留前N个元素。这一要求极大地限制了并行性的使用能力;如果输入分为多个部分,只有在某个部分之前的所有部分都完成了,你才能知道该部分的结果是否会包含在最终结果中。

因此,在你达到目标长度之前,你通常会错误地选择不使用所有可用的核心,或者缓存整个测试结果。

如果流没有遇到顺序,limit()操作可以自由选择任何n个元素,这使得执行效率要高得多。知道元素后,无需任何缓存即可立即发送到下游,线程之间唯一需要执行的协调是发送信号,以确保不超过目标流长度。

另一个不常见的遇到顺序成本的示例是排序。如果遇到有意义的顺序,sorted()操作将实现一个稳定的排序(相同的元素在输入时按相同的顺序出现在输出中),而不需要无序,稳定性(有成本)不是必须的。

distinct()有类似的情况:如果流有一个遇到顺序,distinct()必须发送多个相同的输入元素中的第一个,而对于无序流,它可以发送任何元素——它也可以获得更高效的并行实现。

使用colect()聚合时会遇到类似的情况。如果colect(groupingBy()在无序流上执行,则必须根据输入中出现的顺序向下游收集器提供与任何键对应的元素。

这个顺序通常对应用程序没有意义,任何顺序都没有意义。在这些情况下,最好选择一个并发收集器(如groupingByconcurent(),可以忽略遇到的顺序,让所有线程直接收集到共享的并发数据结构(如concurenthashMap),而不是让每个线程收集到自己的中间映射,然后合并中间映射(这可能会产生很高的成本)。


举报

相关推荐

0 条评论