Java8新增的Stream API是一个强大的特性,它可以简化集合中的常用操作,包括过滤、映射、分组等。下面就来实现一个简易版的Stream。
从表面上看,流似乎和列表很接近,但实际上它们有着本质的区别。
列表是多个元素的容器,当列表被创建出来时,它里面的每个元素也已经被创建出来了。
流是一种计算结构,它封装了内部元素如何产生的计算过程,但是并没有包含实际的元素数据。换句话说,当一个流被创建出来时,它内部的元素并没有被创建,但是我们可以通过调用流的方法来按顺序生成每个元素。
所以,流具有惰性计算的特性,它可以表示普通列表无法表示的一些结构,如无限流。
流的定义流的定义看起来很像链表,一个流由两部分组成:第一个元素(first)和剩余元素组成的流(remain)。定义如下:
public interface Stream{ T first(); Stream remain(); static Stream create(Supplier firstSupplier, Supplier > remainSupplier) { return new Stream<>() { @Override public T first() { return firstSupplier.get(); } @Override public Stream remain() { return remainSupplier.get(); } }; } }
这种递归的定义非常有利于使用递归算法来操作流。下面可以看到,流的大多数相关操作都是用递归算法实现的。
假设我们已经有了一个流,那么如何获取流中的元素呢?首先调用first来获取第一个元素,然后调用remain().first()来获取第二个元素,依此类推:
Streamstream = ... Integer first = stream.first(); // 第一个元素 Integer second = stream.remain().first(); // 第二个元素 Integer third = stream.remain().remain().first(); // 第三个元素 // 依此类推...
当然,我们不会用这种方法来访问流中的元素。具体如何访问,请继续往下看。
空流空流是最简单的流,无法从空流中获取任何元素。空流也标志着一个流的结束。下面是空流的实现:
Stream> EMPTY = create(
() -> {throw new IllegalStateException("当前流已结束");},
() -> {throw new IllegalStateException("当前流已结束");}
);
@SuppressWarnings("unchecked")
static Stream empty() {
return (Stream) EMPTY;
}
default boolean end() {
return this == EMPTY;
}
有限流的生成
有限流可以通过多种方式生成,包括从数组生成、从迭代器生成、从集合生成。
从数组生成流@SafeVarargs static从迭代器生成流Stream of(T... arr) { return fromArray(0, arr); } static Stream fromArray(int startIndex, T[] arr) { return startIndex == arr.length ? empty() : create(() -> arr[startIndex], () -> fromArray(startIndex + 1, arr)); }
static从集合生成流Stream fromIterator(Iterator iterator) { return iterator.hasNext() ? create(iterator::next, () -> fromIterator(iterator)) : empty(); }
static示例Stream fromCollection(Collection collection) { return fromIterator(collection.iterator()); }
Stream无限流的生成s1 = Stream.of(1, 2, 3); // 从数组生成 Stream s2 = Stream.fromIterator(List.of(1, 2, 3).iterator()); // 从迭代器生成 Stream s3 = Stream.fromCollection(Set.of(1, 2, 3)); // 从集合生成
无限流意味着流中的元素个数没有限制,也就是永远都不会结束,所以end方法调用永远为false。有以下两种方法生成无限流。
从工厂方法生成流static从生成器生成流Stream fromSupplier(Supplier supplier) { return create(supplier, () -> fromSupplier(supplier)); }
static示例Stream fromGenerator(T initial, UnaryOperator generator) { return create(() -> initial, () -> generate(generator.apply(initial), generator)); }
Stream遍历流中的元素s1 = Stream.fromSupplier(() -> 1); // 无限个1组成的流 Stream s2 = Stream.fromGenerator(1, n -> n + 1); // 全体自然数组成的流
知道了如何创建流,那么如何遍历或输出流中的元素呢?可以实现下面的forEach方法:
default void forEach(Consumerconsumer) { Stream s = this; while (!s.end()) { consumer.accept(s.first()); s = s.remain(); } }
然后就可以像下面这样输出流中的元素:
Stream流的截断和偏移s = Stream.of(1, 2, 3, 4, 5); s.forEach(System.out::println); // 输出1 2 3 4 5
上面的forEach方法只适用于有限流,如果在无限流上调用forEach方法,会导致死循环。所以,我们需要对无限流进行截取操作,这样就能做到遍历无限流的一部分。
default Streamlimit(int n) { return n <= 0 || end() ? empty() : create(this::first, () -> remain().limit(n - 1)); } default Stream skip(int n) { return end() || n <= 0 ? this : remain().skip(n - 1); }
limit用于提取流的前n个元素,skip用于忽略流的前n个元素,有了这两个方法,我们就能随心所欲地截取任何流中的任意一段。
流的变换操作熟悉Java8 Stream API的读者一定用过map和filter这两个常用的流操作,下面我们就来实现它们。
mapmap用于对流中的所有元素进行转换操作。
default Stream map(Functionfiltermapper) { return end() ? empty() : create(() -> mapper.apply(first()), () -> remain().map(mapper)); }
filter用于过滤流中的元素。
default Stream示例filter(Predicate predicate) { if (end()) { return empty(); } T e = first(); if (predicate.test(e)) { return Stream.create(() -> e, () -> remain().filter(predicate)); } else { return remain().filter(predicate); } }
Stream流的聚合操作s = Stream.of(1, 2, 3, 4, 5, 6) .filter(n -> n % 2 == 0) // 2, 4, 6 .map(n -> "hello " + n); // hello 2, hello 4, hello 6
有时候我们像将整个流聚合成某种数据结构,如列表、集合等,这就需要用到流的聚合操作。
collect对流进行自定义聚合操作。
default U collect(U initial, BiFunction accumulator) {
U result = initial;
Stream s = this;
while (!s.end()) {
result = accumulator.apply(result, s.first());
s = s.remain();
}
return result;
}
toList
将流转换成列表。
default ListtoSettoList() { return collect(new ArrayList<>(), (list, e) -> { list.add(e); return list; }); }
将流转换成集合。
default SettoMaptoSet() { return collect(new HashSet<>(), (set, e) -> { set.add(e); return set; }); }
将流转换成Map。
defaultcountMap toMap(Function keyGenerator, Function valueGenerator) { return collect(new HashMap<>(), (map, e) -> { map.put(keyGenerator.apply(e), valueGenerator.apply(e)); return map; }); }
对流中的元素进行计数。
default int count() {
return collect(0, (cnt, e) -> cnt + 1);
}
流的高级操作
下面是流的一些高级操作。
concatconcat用于将两个流首尾连接在一起。
staticStream concat(Stream s1, Stream s2) { return s1.end() ? s2 : create(s1::first, () -> concat(s1.remain(), s2)); } default Stream concat(Stream s) { return concat(this, s); }
示例:
Streaminterleaves1 = Stream.of(1, 2, 3, 4); Stream s2 = Stream.of(5, 6, 7); Stream s = s1.concat(s2); // 1, 2, 3, 4, 5, 6, 7
interleave用于将两个流交错连接在一起。
staticStream interleave(Stream s1, Stream s2) { return s1.end() ? s2 : create(s1::first, () -> interleave(s2, s1.remain())); } default Stream interleave(Stream s) { return interleave(this, s); }
示例:
StreamflatMaps1 = Stream.of(1, 3, 5, 7); Stream s2 = Stream.of(2, 4, 6); Stream s = s1.interleave(s2); // 1, 2, 3, 4, 5, 6, 7
flatMap用于将流中的每个元素都映射成一个流,然后将所有流连接起来。
default Stream flatMap(Function> mapper) { return collect(empty(), (s, e) -> s.concat(mapper.apply(e))); }
示例:
Stream后记s = Stream.of(10, 20) .flatMap(n -> Stream.of(n + 1, n + 2, n + 3)); // 11, 12, 13, 21, 22, 23
完整代码:https://github.com/byx2000/simple-stream



