在Java编程中,ArrayList估计是最常用的类之一,日常当作数组来用。对于数组的操作,进行会涉及遍历、过滤数据、修改数据等操作,这些操作最简单的就是for循环遍历了,然而却不够优雅。
常规模式下最简单的遍历方式,如下所示。
ArrayListarr = new ArrayList<>(); for (String str : arr) { System.out.println(str); }
那有没有更简单的方式呢?如果你学过Javascripts,对于数组操作,最常用的就是流式操作了,如下所示:
let arr = [1,2,3] arr.map(x => console.log(x)); // 打印出每个元素 arr.filter(x => x > 2).map(x => console.log(x)) // 打印出大于2的元素
那么,Java里面有没有这种类似的操作呢?对于Java这种非常老练完善的语言,当然也是会有的,这就是stream。
在IDEA中,实例化一个ArrayList对象,在调用对象方法的时候,IDEA会提示一系列类似stream().map()、stream().filter()、stream().forEach()的方法。这些方法可能就是我们要找的方法呢,所以我们来看看是不是。
首先,数组对象调用的是stream()方法,Command+B(macOS的快捷键)以下这个方法,看看它的实现
public interface Collectionextends Iterable { default Stream stream() { return StreamSupport.stream(spliterator(), false); } }
这个方法是Collection接口下的一个默认方法(不知道默认方法?先去了解下),为啥会直接跳到这个方法,当然是因为ArrayList实现了List接口,List接口又继承了Collection接口,而且List 和 ArrayList都没有重写该方法,所以上面那段就是源码了。
stream()方法返回的就是一个实现了Stream接口的对象,那先看看Stream接口长啥样。
// A sequence of elements supporting sequential and parallel aggregate operations public interface Streamextends baseStream > { // Returns a stream consisting of the elements of this stream that match the given predicate. // 返回一个由符合给定条件的元素组成的流 Stream filter(Predicate super T> predicate); // Returns a stream consisting of the results of applying the given function to the elements of this stream // 对每个元素进行函数操作,然后返回所有元素组成的流 Stream map(Function super T, ? extends R> mapper); // Performs an action for each element of this stream // 对每个元素执行一个操作 void forEach(Consumer super T> action); }
根据注释,这几个就是我们需要找的方法,由于是抽象方法,具体干了啥还要找到实现这个接口的类。
spliterator()回到stream()这个方法,很简单的内容,只有一行,先执行spliterator(),再执行StreamSupport.stream()方法。
好的,看看spliterator(),继续Command+B,跳到了这里。
public interface Collectionextends Iterable { @Override default Spliterator spliterator() { return Spliterators.spliterator(this, 0); } }
这里需要注意下,spliterator()是默认方法,但是被ArrayList重写了,所以具体内容,得去ArrayList下面找,下面这个才是真正的。
public class ArrayListextends AbstractList implements List , RandomAccess, Cloneable, java.io.Serializable { @Override public Spliterator spliterator() { return new ArrayListSpliterator(0, -1, 0); } final class ArrayListSpliterator implements Spliterator { private int index; // current index, modified on advance/split private int fence; // -1 until used; then one past last index private int expectedModCount; // initialized when fence set ArrayListSpliterator(int origin, int fence, int expectedModCount) { this.index = origin; this.fence = fence; this.expectedModCount = expectedModCount; } public void forEachRemaining(Consumer super E> action) { int i, hi, mc; // hoist accesses and checks from loop Object[] a; if (action == null) throw new NullPointerException(); if ((a = elementData) != null) { // 这里的elementData就是ArrayList的元素数组 if ((hi = fence) < 0) { mc = modCount; hi = size; } else mc = expectedModCount; // 以下是遍历数组 if ((i = index) >= 0 && (index = hi) <= a.length) { for (; i < hi; ++i) { @SuppressWarnings("unchecked") E e = (E) a[i]; action.accept(e); // 关键代码!!! } if (modCount == mc) return; } } throw new ConcurrentModificationException(); } } }
以上,spliterator()函数创建了实例化了一个对象ArrayListSpliterator,ArrayListSpliterator的其他代码删除了,只保留了本文会涉及的 forEachRemaining()函数,先剧透下,这是调起stream一系列处理的入口,也就是action.accept(e)。
ReferencePipeline.Head再回到stream()方法,看看StreamSupport.stream()方法,这里new了一个ReferencePipeline.Head对象,传的参数就是上面的ArrayListSpliterator对象。
public final class StreamSupport {
// Creates a new sequential or parallel Stream from a Spliterator.
public static Stream stream(Spliterator spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
}
然后再看ReferencePipeline.Head 这个类(Head类是ReferencePipeline的内部类,同时继承了ReferencePipeline)。
abstract class ReferencePipelineextends AbstractPipeline > implements Stream { static class Head extends ReferencePipeline { // Constructor for the source stage of a Stream. Head(Spliterator> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } @Override final Sink opWrapSink(int flags, Sink sink) { throw new UnsupportedOperationException(); } // Optimized sequential terminal operations for the head of the pipeline @Override public void forEach(Consumer super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { super.forEach(action); } } } }
为啥这个内部类叫Head呢,因为是这个pipeline的头部,stream流式处理数据,数据是一级一级往下流动,那么首尾跟中间的就不一样。比如这个类就重写了opWrapSink方法,抛出异常,禁止调用。
如果只是遍历数组,不做其他操作,那么直接stream().forEach() 就可以,流就结束了,调用的就是这个类重写的forEach方法
// 这里的sourceStageSpliterator()返回的就是上面提到的ArrayListSpliterator对象,这里forEachRemaining方法就出现了!!(后面还会出现的) sourceStageSpliterator().forEachRemaining(action);ReferencePipeline
上面的Head类的构造函数,调用了父类的构造函数super(source, sourceFlags, parallel),也就是ReferencePipeline的构造函数。ReferencePipeline又继承了AbstractPipeline,所以最终还调用了AbstractPipeline的构造方法。
abstract class ReferencePipelineextends AbstractPipeline > implements Stream { ReferencePipeline(Spliterator> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } } abstract class AbstractPipeline > extends PipelineHelper implements baseStream { AbstractPipeline(Spliterator> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; } }
好了,stream()方法内容就执行结束了,总结下,
- 第一步,把ArrayList封装成了ArrayListSpliterator,这个类有个遍历元素的方法forEachRemaining
- 第二步,创建了个ReferencePipeline.Head对象,由于Head继承了ReferencePipeline,ReferencePipeline又实现了Stream接口,所以具备了Stream的一系列方法。
AbstractPipeline构造函数里面,记录下来了ArrayListSpliterator对象,也就是sourceSpliterator,depth = 0 表示管道第一级。
然后,来看看Stream的filter方法,看看是怎么样的。
Stream.filterReferencePipeline实现了接口Stream,所以filter方法必然在ReferencePipeline类或者其子类中实现了,以下就是filter源码了。
abstract class ReferencePipelineextends AbstractPipeline > implements Stream { @Override public final Stream filter(Predicate super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp (this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference (sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; } }
filter方法看起来复杂,其实很简单,创建了个继承了抽象类StatelessOp的匿名类并实例化了匿名类的对象(匿名内部类),并实现了抽象方法opWrapSink,opWrapSink函数也是一样,创建了个继承了抽象类Sink.ChainedReference的匿名类并实例化了匿名类的对象。
先来看看map方法,会发现惊人的相似。
abstract class ReferencePipelineextends AbstractPipeline > implements Stream { @Override public final Stream map(Function super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp (this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference (sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; } }
对比filter和map函数的源码,都是返回StatelessOp对象,opWrapSink方法也是返回Sink.ChainedReference对象,filter重写了begin和accept方法,map只重写了accept方法,而且accept方法内容不同。同样看其他方法,基本都是accept实现内容不一样,所以,这个accept方法肯定就是整个管道的关键,后面看看是怎么调用到这个accept方法的。
好了,回到StatelessOp,看看它是啥。
abstract static class StatelessOpextends ReferencePipeline { //Construct a new Stream by appending a stateless intermediate operation to an existing stream. StatelessOp(AbstractPipeline, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful() { return false; } }
StatelessOp 是个继承了ReferencePipeline的抽象类,仅仅是为了让opIsStateful始终返回false,没别的功能。
这里有点要注意,stream() 方法返回的是ReferencePipeline.Head类的对象,filter()返回的却是StatelessOp子类的对象。
再来看StatelessOp的构造函数,直接调用父类ReferencePipeline的构造函数,又调用了AbstractPipeline的构造函数。
abstract class AbstractPipeline> extends PipelineHelper implements baseStream { AbstractPipeline(AbstractPipeline, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_linkED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; } }
这里的,previousStage就是 stream() 返回的 Head对象或者是StatelessOp子类的对象,nextStage指向本对象,sourceStage指向上一个stage的sourceStage,depth自增1。
这个流程有点像双向链表,stream().filter().map() 这行代码,意味着创建一个Head对象(A),一个包含了filter逻辑的StatelessOp(B),一个包含了map逻辑的StatelessOp(C),示意图如下:
好了,无论后面跟多少了filter、map这种,都是这样不断包装,不断延长这个链表了,那问题来了,啥时候执行呢??
前面说到,stream() 返回的是这个流式操作的第一级,还是做了特殊处理Head对象,有头就有尾,尾巴说不定就是启动执行的。
Stream.forEachforEach 就是尾巴之一,除此之外还有collect、reduce、count等,都可以当作尾巴,可以终止stream,并执行前面封装的一系列操作。先看看forEach的源码。
abstract class ReferencePipelineextends AbstractPipeline > implements Stream { @Override public void forEach(Consumer super P_OUT> action) { evaluate(ForEachOps.makeRef(action, false)); } }
这里要特别注意,StatelessOp没有重写forEach方法,Head重写了forEach方法,所以执行的是不一样的,要看什么时候调用的forEach,stream()后面直接调用forEach就是Head的(前面分析过了),除此之外都是ReferencePipeline的forEach方法。
这个forEach方法,看起来很简单哦,就一行代码,但是里面隐藏的东西很多,一步一步来看。
先看这个ForEachOps.makeRef(action, false),这个action就是forEach要做的事。
final class ForEachOps {
public static TerminalOp makeRef(Consumer super T> action, boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
abstract static class ForEachOp implements TerminalOp, TerminalSink {
@Override
public Void evaluateSequential(PipelineHelper helper, Spliterator spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
static final class OfRef extends ForEachOp {
final Consumer super T> consumer;
OfRef(Consumer super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
consumer.accept(t);
}
}
}
}
ForEachOps.makeRef(action, false) 这条语句返回了个ForEachOp.OfRef对象,这个对象继承了ForEachOp,实现了接口TerminalOp、TerminalSink (TerminalOp 从名字上看,这就是终止操作呀)。
好了,再执行evaluate()函数了,
abstract class AbstractPipeline> extends PipelineHelper implements baseStream { final R evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_linkED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); } }
本文分析的都不涉及parallel,所以这里会执行的就是terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())),也就是ForEachOp的evaluateSequential方法了。这里的this就是最后一个中间流操作。
这里先执行sourceSpliterator(terminalOp.getOpFlags()),上源码(从函数名大概也猜到了是返回原数据的spliterator,也就是最开始的ArrayListSpliterator)
abstract class AbstractPipeline> extends PipelineHelper implements baseStream { private Spliterator> sourceSpliterator(int terminalFlags) { // Get the source spliterator of the pipeline Spliterator> spliterator = null; if (sourceStage.sourceSpliterator != null) { spliterator = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null; } else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { throw new IllegalStateException(MSG_CONSUMED); } if (isParallel() && sourceStage.sourceAnyStateful) { // Adapt the source spliterator, evaluating each stateful op // in the pipeline up to and including this pipeline stage. // The depth and flags of each pipeline stage are adjusted accordingly. int depth = 1; for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage) { int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { depth = 0; if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { // Clear the short circuit flag for next pipeline stage // This stage encapsulates short-circuiting, the next // stage may not have any short-circuit operations, and // if so spliterator.forEachRemaining should be used // for traversal thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; } spliterator = p.opevaluateParallelLazy(u, spliterator); // Inject or clear SIZED on the source pipeline stage // based on the stage's spliterator thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } p.depth = depth++; p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } if (terminalFlags != 0) { // Apply flags from the terminal operation to last pipeline stage combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; } }
果然,最后返回的是spliterator,而且是来自于sourceStage.sourceSpliterator,也就是创建Head时传进来的ArrayListSpliterator。剩下中间长长的代码,跟现有的分析没啥关系,先跳过。
evaluateSequential方法也只有一行
publicVoid evaluateSequential(PipelineHelperhelper, Spliterator spliterator) { return helper.wrapAndCopyInto(this, spliterator).get(); }
由于这里的helper指向的其实是StatelessOp子类的对象,也就是在AbstractPipeline或者ReferencePipeline里面实现了wrapAndCopyInto方法,实际找下是在AbstractPipeline实现的。
abstract class AbstractPipeline> extends PipelineHelper implements baseStream { @Override final > S wrapAndCopyInto(S sink, Spliterator spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; } @Override final void copyInto(Sink wrappedSink, Spliterator spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } } @Override final Sink wrapSink(Sink sink) { for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink ) sink; } }
关键的来了!!
wrapAndCopyInto方法里面,先执行了wrapSink(Objects.requireNonNull(sink)), Objects.requireNonNull可以直接忽略。
这里的sink是啥?有点忘了吧,找回去,evaluateSequential方法里面的传参this, 也就是OfRef对象,因为其间接实现了TerminalSink,TerminalSink继承了Sink,这里的类型S也是要继承了Sink的接口,这就对上了。
然后就是wrapSink方法,AbstractPipeline.this 获取的是AbstractPipeline的this指针,然后遍历所有的stage,也就封装的所有操作。那这里的opWrapSink调用的是哪里的方法呢?
回想下,所有的操作流程都封装了成了StatelessOp或Head,而且每次封装的时候都重写了opWrapSink方法,所以这就是filter、map等函数实现的方法呀。
所以这里遍历,就把所有的stage的opWrapSink方法都调用了一遍,回看opWrapSink方法,它返回了Sink.ChainedReference对象。
然后来看Sink.ChainedReference的构造函数
interface Sinkextends Consumer { abstract static class ChainedReference implements Sink { protected final Sink super E_OUT> downstream; public ChainedReference(Sink super E_OUT> downstream) { this.downstream = Objects.requireNonNull(downstream); } } }
回到wrapSink方法,里面的for循环是逆序遍历的,所以每次创建个新的Sink.ChainedReference对象,就把downstream指向上一个遍历的。
stream().filter().map().forEach() 这行代码,意味着创建一个Head对象(A),一个包含了filter逻辑的StatelessOp(B),一个包含了map逻辑的StatelessOp(C),一个包含forEach逻辑的OfRef(D),wrapSink之后,示意图如下:
A哪去了?A是Head,重写了opWrapSink方法会抛异常呢,往上翻Head的源码,所以wrapSink 遍历的时候depth > 0, 所以没有把A进来。
好了,然后执行copyInto方法了,
第一步判断是否是短路操作StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags()),不是短路操作才执行,这是个啥,从来没设置过,所以肯定不是。后续再看这个有啥用。
短路操作,有符合条件就不在执行后面的,比如anyMatch() allMatch() noneMatch() findFirst() findAny()等方法,找到了符合条件的就不再遍历剩下的数据
然后就是三行代码:
wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end();
第一行,spliterator.getExactSizeIfKnown() 取的是ArrayList大小,这里的begin函数好像没啥用?
第三行,wrappedSink.end() 也没看出来有啥用
第二行,spliterator往回查,就是stream()里面封装的ArrayListSpliterator,调用forEachRemaining,这个的源码上面贴了,就是遍历ArrayList的元素,每个元素都执行一遍“action.accept(e);”
这里的action就是wrappedSink,还记得filter函数里面重写的accept方法吗?就这里调用了!!!!
再来看看filter函数源码,wrappedSink指向的就是这里实例化的Sink.ChainedReference对象,然后调用各自定义的accept方法,filter里面,如果predicate.test(u) 为true,则执行downstream的accept方法,downstream是啥?这个指向下一个操作呀;如果为false呢,后续的操作不执行了,整个“action.accept(e);”就执行完了,再来下一个元素。
public final Streamfilter(Predicate super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp (this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference (sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
再来看map函数,直接看最关键的:
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
先执行map传的函数,然后直接调用下一个操作。
执行到最后个操作,就是forEach,包装成了OfRef对象,也重写了accept方法
static final class OfRefextends ForEachOp { final Consumer super T> consumer; OfRef(Consumer super T> consumer, boolean ordered) { super(ordered); this.consumer = consumer; } @Override public void accept(T t) { consumer.accept(t); } }
终于执行完了!
forEach是最简单的终结操作,接下来看看复杂点的collect吧
collect函数有重载,如下
public finalR collect(Collector super P_OUT, A, R> collector) public final R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner)
看下Collector接口的定义,会发现就是下面三个参数的封装,所以只看下面这种。
@Override
public final R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) {
return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
}
collect方法与forEach方法有差异的地方就是,collect调用的ReduceOps的方法,forEach调用的是ForEachOps的方法。
由于上面分析的很细,下面就简单点写,流程其实差不多
- makeRef方法new了个ReduceOp对象,重写了makeSink方法,这个方法会new一个ReducingSink对象,这个对象封装了collect传进来的三个参数。
public staticTerminalOp makeRef(Supplier seedFactory, BiConsumer accumulator, BiConsumer reducer) { class ReducingSink extends Box implements AccumulatingSink { @Override public void begin(long size) { state = seedFactory.get(); } @Override public void accept(T t) { accumulator.accept(state, t); } @Override public void combine(ReducingSink other) { reducer.accept(state, other.state); } } return new ReduceOp (StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } }; }
ReduceOp 也重写了evaluateSequential方法(都实现了TerminalOp接口),执行了makeSink方法,返回了ReducingSink类,等同于上面的OfRef类
@Override publicR evaluateSequential(PipelineHelper helper, Spliterator spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); }
剩下的操作跟forEach的都一样,直到copyInto方法,之前认为没啥用的begin方法,这里有用了,ReducingSink的begin方法是有实际意义的。调用了seedFactory参数的get方法。
然后在forEachRemaining调用accept方法就结束了,combine只有在并行运行的时候才会用到,具体怎么用的,不知道,待有空研究。



