我想用一种解决方案来扩展Tagir的出色答案,该解决方案可以监视_源_ 端甚至中间操作的拆分(受当前流API实现的某些限制):
public static <E> Stream<E> proxy(Stream<E> src) { Class<Stream<E>> sClass=(Class)Stream.class; Class<Spliterator<E>> spClass=(Class)Spliterator.class; return proxy(src, sClass, spClass, StreamSupport::stream);}public static IntStream proxy(IntStream src) { return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream);}public static LongStream proxy(LongStream src) { return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream);}public static DoubleStream proxy(DoubleStream src) { return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream);}static final Object EMPTY=new StringBuilder("empty");static <E,S extends baseStream<E,S>, Sp extends Spliterator<E>> S proxy( S src, Class<S> sc, Class<Sp> spc, BiFunction<Sp,Boolean,S> f) { final class Node<T> implements InvocationHandler,Runnable, Consumer<Object>, IntConsumer, LongConsumer, DoubleConsumer { final Class<? extends Spliterator> type; Spliterator<T> src; Object first=EMPTY, last=EMPTY; Node<T> left, right; Object currConsumer; public Node(Spliterator<T> src, Class<? extends Spliterator> type) { this.src = src; this.type=type; } private void value(Object t) { if(first==EMPTY) first=t; last=t; } public void accept(Object t) { value(t); ((Consumer)currConsumer).accept(t); } public void accept(int t) { value(t); ((IntConsumer)currConsumer).accept(t); } public void accept(long t) { value(t); ((LongConsumer)currConsumer).accept(t); } public void accept(double t) { value(t); ((DoubleConsumer)currConsumer).accept(t); } public void run() { System.out.println(); finish().forEach(System.out::println); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Node<T> curr=this; while(curr.right!=null) curr=curr.right; if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) { curr.currConsumer=args[0]; args[0]=curr; } if(method.getName().equals("trySplit")) { Spliterator s=curr.src.trySplit(); if(s==null) return null; Node<T> pfx=new Node<>(s, type); pfx.left=curr.left; curr.left=pfx; curr.right=new Node<>(curr.src, type); src=null; return pfx.create(); } return method.invoke(curr.src, args); } Object create() { return Proxy.newProxyInstance(null, new Class<?>[]{type}, this); } String pad(String s, int left, int len) { if (len == s.length()) return s; char[] result = new char[len]; Arrays.fill(result, ' '); s.getChars(0, s.length(), result, left); return new String(result); } public List<String> finish() { String cur = toString(); if (left == null) { return Collections.singletonList(cur); } List<String> l = left.finish(); List<String> r = right.finish(); int len1 = l.get(0).length(); int len2 = r.get(0).length(); int totalLen = len1 + len2 + 1; int leftAdd = 0; if (cur.length() < totalLen) { cur = pad(cur, (totalLen - cur.length()) / 2, totalLen); } else { leftAdd = (cur.length() - totalLen) / 2; totalLen = cur.length(); } List<String> result = new ArrayList<>(); result.add(cur); char[] dashes = new char[totalLen]; Arrays.fill(dashes, ' '); Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1 + leftAdd, '_'); int mid = totalLen / 2; dashes[mid] = '/'; dashes[mid + 1] = '\'; result.add(new String(dashes)); Arrays.fill(dashes, ' '); dashes[len1 / 2 + leftAdd] = '|'; dashes[len1 + len2 / 2 + 1 + leftAdd] = '|'; result.add(new String(dashes)); int maxSize = Math.max(l.size(), r.size()); for (int i = 0; i < maxSize; i++) { String lstr = l.size() > i ? l.get(i) : String.format("%" + len1 + "s", ""); String rstr = r.size() > i ? r.get(i) : String.format("%" + len2 + "s", ""); result.add(pad(lstr + " " + rstr, leftAdd, totalLen)); } return result; } private Object first() { if(left==null) return first; Object o=left.first(); if(o==EMPTY) o=right.first(); return o; } private Object last() { if(right==null) return last; Object o=right.last(); if(o==EMPTY) o=left.last(); return o; } public String toString() { Object o=first(), p=last(); return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+']': "]"); } } Node<E> n=new Node<>(src.spliterator(), spc); Sp sp=(Sp)Proxy.newProxyInstance(null, new Class<?>[]{n.type}, n); return f.apply(sp, true).onClose(n);}它允许用代理程序包装拆分器,该代理器将监视拆分操作和遇到的对象。块处理的逻辑类似于Tagir的逻辑,实际上,我复制了他的结果打印例程。
您可以传入流的源或已附加相同操作的流。(在后一种情况下,您应
.parallel()及早申请该流)。正如Tagir解释的那样,在大多数情况下,拆分行为取决于源和配置的并行度,因此,在大多数情况下,监视中间状态可能会更改值,但不会更改已处理的块:
try(IntStream is=proxy(IntStream.range(0, 100).parallel())) { is.filter(i -> i/20%2==0) .mapToObj(ix->"""+ix+'"') .forEach(s->{});}将打印
[0..99] ___________________________________/________________________________ | | [0..49] [50..99] _________________/______________ _________________/________________ || | | [0..24] [25..49] [50..74] [75..99]________/_____ ________/_______ ________/_______ ________/_______ | | | | | | | |[0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99] ___/_ ___/___ ___/___ ___/___ ___/___ ___/___ ___/___ ___/___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]
而
try(Stream<String> s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0) .mapToObj(ix->"""+ix+'"'))) { s.forEach(str->{});}将打印
["0".."99"] ___________________________________________/___________________________________________ || ["0".."49"] ["50".."99"] ____________________/______________________ ______________________/___________________ || | | ["0".."19"] ["40".."49"] ["50".."59"] ["80".."99"] ____________/_____________________/______ _______/___________ ____________/________ | | | | | | || ["0".."11"] ["12".."19"] (empty)["40".."49"] ["50".."59"] (empty) ["80".."86"]["87".."99"] _____/___ _____/________/__ _____/_____ _____/________/__ _____/__ _____/_____| | | | | | | | | | | | | | | | ["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"]
正如我们在此处看到的那样,我们正在监视的结果,
.filter(…).mapToObj(…)但是块显然是由源确定的,可能会根据过滤器的条件在下游产生空块。
请注意,我们可以将源代码监视与Tagir的收集器监视结合起来:
try(IntStream s=proxy(IntStream.range(0, 100))) { s.parallel().filter(i -> i/20%2==0) .boxed().collect(parallelVisualize()) .forEach(System.out::println);}这将打印(请注意,
collect输出首先打印):
[0..99] ________________________________/_______________________________| | [0..49] [50..99] ________________/______________ _______________/_______________ | || | [0..19] [40..49] [50..59] [80..99] ________/_____ ________/______ _______/_______ ________/_____ | | | | | | | |[0..11] [12..19] (empty) [40..49] [50..59] (empty) [80..86] [87..99] ___/_ ___/___ ___/__ ___/___ ___/___ ___/__ ___/_ ___/___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99][0..99] ___________________________________/________________________________ | | [0..49] [50..99] _________________/______________ _________________/________________ || | | [0..24] [25..49] [50..74] [75..99]________/_____ ________/_______ ________/_______ ________/_______ | | | | | | | |[0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99] ___/_ ___/___ ___/___ ___/___ ___/___ ___/___ ___/___ ___/___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]
我们可以清楚地看到处理的块是如何匹配的,但是在过滤之后,一些块的元素较少,其中一些完全为空。
这是演示的地方,两种监视方式可以在其中发挥重要作用:
try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) { is.boxed() .collect(parallelVisualize()) .forEach(System.out::println);} [0.0..99.0] ___________________________________________________/________________________________________________ | | [0.0..49.0] [50.0..99.0] _________________________/______________________ _________________________/________________________ | | | | [0.0..24.0] [25.0..49.0] [50.0..74.0] [75.0..99.0] ____________/_________ ____________/___________ ____________/___________ ____________/___________ | | | | | | | | [0.0..11.0] [12.0..24.0] [25.0..36.0] [37.0..49.0] [50.0..61.0] [62.0..74.0] [75.0..86.0] [87.0..99.0] _____/___ _____/_____ _____/_____ _____/_____ _____/_____ _____/_____ _____/_____ _____/_____| | | | | | | | | | | | | | | | [0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0] [0.0..10239.0] _____________________________/_____ | | [0.0..1023.0][1024.0..10239.0]____________________/_______ | | [1024.0..3071.0] [3072.0..10239.0] ____________/______ | | [3072.0..6143.0] [6144.0..10239.0] ___/_______ | | [6144.0..10239.0] (empty)这表明Tagir已经解释了什么,大小未知的流分配不佳,甚至事实
limit(…)提供了进行良好估计的可能性(实际上,无限+极限在理论上是可预测的),实现没有利用它。
使用的批量大小将源拆分为多个块
1024,
1024每次拆分后将其增加,从而在施加的范围之外创建块
limit。我们还可以看到每次如何分割前缀。
但是,当我们查看终端拆分输出时,我们可以看到这些多余的块之间已被丢弃,并且第一个块又发生了拆分。由于此块是由中间数组后端组成的,该中间数组已在第一次拆分时由默认实现填充,因此我们在源头没有注意到它,但是我们可以在终端操作中看到此数组已拆分(不足为奇)平衡。
因此,我们需要两种监视方式才能在此处获得完整的图像…



