栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java8流源码解析-串行流

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java8流源码解析-串行流

概述 Stream体系 流和中间阶段关系图

baseStream:对一组可序列化的元素支持串行、并行的聚合操作。
Stream:对一组可序列化的元素支持串行、并行的聚合操作。
PipelineHelper:执行Stream流水线的帮助类,捕获一个Stream流水线的全部信息(输出形状,中间操作,流标记,是否并行等)。
AbstractPipeline:流水线的抽象基本类,提供了流接口的核心实现,管理构造和执行Stream流水线。
ReferencePipeline:对流水线的中间阶段或者源阶段处理的抽象类。
ReferencePipeline.Head:ReferencePipeline源阶段。
ReferencePipeline.StatefulOp:Stream的有状态中间阶段的基本抽象类。
ReferencePipeline.StatelessOp:Stream的无状态中间阶段的基本抽象类。

中间操作和流源都属于ReferencePipeline(即Stream)。

终端操作

TerminalOp:消费输入的流并产生输出结果。
Sink:Consumer类的扩展,通过流水线的阶段传导值,使用相关方法去管理信息大小、流程控制等。
TerminalSink:一个Sink,用于累积元素被消费的状态,当计算结束后允许去获取计算结果。
ForEachOp:一个终端操作,执行流式线和发送输出结果到自己,作为一个TerminalSink。
ReduceOp:一个终端操作,执行流式线和发送输出结果到AccumulatingSink,RedueceOp用来执行化简操作。
MatchOp:一个短路终端操作,流上的所有元素执行谓词操作,决定是否所有的、任何的、没有元素满足谓词判断。
FindOp:一个短路终端操作,搜索流上的元素,当发现时中断执行。

Sink

Sink:Consumer类的扩展,通过流水线的阶段传导值,使用相关方法去管理信息大小、流程控制等。
TerminalSink:一个Sink,用于累积元素被消费的状态,当计算结束后允许去获取计算结果。
FindSink:实现TerminalSink,具有元素发现、当发现目标后请求取消功能。
AccumulatingSink:TerminalSink的一种,实现流上的元素归约和返回计算结果。
Sink.ChainedReference:实现Sink的抽象类,用于创建Sink链。通过begin,end,cancellationRequest连接下游的水槽。
SortedOps.AbstractRefSortingSink:实现Sink的抽象类,实现引用流的排序。
SortedOps.RefSortingSink:实现引用流排序的Sink。
SortedOps.SizedRefSortingSink:实现指定大小的引用流排序的Sink。

Spliterator

拆分器,遍历或者拆分源数据。
拆分器可以用tryAdvance()单个的访问元素或者使用forEachRemaining顺序遍历元素。

Stream总结

Stream流都是获取一个数据源–》数据转换–》执行操作 获取想要的结果。每次转换原有Stream对象不变,返回一个新的stream对象,可以有多次转换,这就允许对流的操作可以向链条一样排列,变成一个流水线/管道(pipeline)。
Stream使用ReferencePipeline记录用户中间操作,它把每一个操作当成一个阶段,它把这个阶段分成三个类型:Head、StatefulOp和StatelessOp,Stream每调用一次操作其实就是生成一个新的阶段。这些阶段通过双向链表的形式组织串联在一起,建立起了阶段的叠加。
阶段叠加起来之后,Stream使用Sink机制把每个操作串联起来。Sink是封装在每个流阶段里面的,包含TerminalSink和ChainedReference,当终止操作执行的时候,内部封装一个accumulatingSink,终止阶段会从TerminalSink开始从下游往上游回溯,其实就是指针迁移,一层一层包装Sink,最终包装出一个Sink链,在Sink链路中,元素是通过accept方法进行发射传递的。构造完成Sink链路之后就可以对每个元素分别执行begin、accept、end操作完成内部迭代了。

流源构建(构建流水线的Head阶段)

Collection类里的stream()方法就是生成流源的:
Collection.java

default Stream stream() {
    return StreamSupport.stream(spliterator(), false);
}

StreamSupport.java

public static  Stream stream(Spliterator spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

可以看到它最终是通过分割迭代器构建了一个流管道的源ReferencePileline.Head,是流水线的开始阶段。通过父类AbstractPipeline属性可以看出流的每个阶段是一个双向链表的节点,而每个阶段都可以反向链接回源阶段。

abstract class AbstractPipeline>
        extends PipelineHelper implements baseStream {
    private static final String MSG_STREAM_linkED = "stream has already been operated upon or closed";
    private static final String MSG_ConSUMED = "source already consumed or closed";

    
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline sourceStage;

    
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline previousStage;

    
    protected final int sourceOrOpFlags;

    
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;

    
    private int depth;

    
    private int combinedFlags;

    
    private Spliterator sourceSpliterator;

    
    private Supplier> sourceSupplier;

    
    private boolean linkedOrConsumed;

    
    private boolean sourceAnyStateful;

    private Runnable sourceCloseAction;

    
    private boolean parallel;

...

}

构建流源比重很大的一个参数是sourceSpliterator,这个迭代器Spliterator是jdk8引入的接口,类似于Iterator,但是它是可分割的,利用分而治之的思想,在并行流的时候可以利用多线程ForkJoin并行操作,而且每次处理集合元素时使用tryAdvance()或forEachRemaining()方法。
流源的构建负责封装原始数据,并初始化双向链表的数据结构。

中间操作阶段

以map操作举例。
ReferencePipeline.java

public final  Stream map(Function mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink opWrapSink(int flags, Sink sink) {
            return new Sink.ChainedReference(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

方法构建了一个StatelessOp,它是无状态中间操作阶段的基类,该类创建一个双向链表的节点,并把之前的节点设置成上个操作阶段,刚刚创建的节点设置成下一个阶段。
AbstractPipeline.java

   AbstractPipeline(AbstractPipeline previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_linkED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;

        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }
参考

java8串行流源码解析

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/731985.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号