栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Java Stream并行化的可视化

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java Stream并行化的可视化

我想用一种解决方案来扩展Tagir的出色答案,该解决方案可以监视_源_ 端甚至中间操作的拆分(受当前流API实现的某些限制):

public static <E> Stream<E> proxy(Stream<E> src) {    Class<Stream<E>> sClass=(Class)Stream.class;    Class<Spliterator<E>> spClass=(Class)Spliterator.class;    return proxy(src, sClass, spClass, StreamSupport::stream);}public static IntStream proxy(IntStream src) {    return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream);}public static LongStream proxy(LongStream src) {    return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream);}public static DoubleStream proxy(DoubleStream src) {    return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream);}static final Object EMPTY=new StringBuilder("empty");static <E,S extends baseStream<E,S>, Sp extends Spliterator<E>> S proxy(        S src, Class<S> sc, Class<Sp> spc, BiFunction<Sp,Boolean,S> f) {    final class Node<T> implements InvocationHandler,Runnable,        Consumer<Object>, IntConsumer, LongConsumer, DoubleConsumer {        final Class<? extends Spliterator> type;        Spliterator<T> src;        Object first=EMPTY, last=EMPTY;        Node<T> left, right;        Object currConsumer;        public Node(Spliterator<T> src, Class<? extends Spliterator> type) { this.src = src; this.type=type;        }        private void value(Object t) { if(first==EMPTY) first=t; last=t;        }        public void accept(Object t) { value(t); ((Consumer)currConsumer).accept(t);        }        public void accept(int t) { value(t); ((IntConsumer)currConsumer).accept(t);        }        public void accept(long t) { value(t); ((LongConsumer)currConsumer).accept(t);        }        public void accept(double t) { value(t); ((DoubleConsumer)currConsumer).accept(t);        }        public void run() { System.out.println(); finish().forEach(System.out::println);        }        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Node<T> curr=this; while(curr.right!=null) curr=curr.right; if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) {     curr.currConsumer=args[0];     args[0]=curr; } if(method.getName().equals("trySplit")) {     Spliterator s=curr.src.trySplit();     if(s==null) return null;     Node<T> pfx=new Node<>(s, type);     pfx.left=curr.left; curr.left=pfx;     curr.right=new Node<>(curr.src, type);     src=null;     return pfx.create(); } return method.invoke(curr.src, args);        }        Object create() { return Proxy.newProxyInstance(null, new Class<?>[]{type}, this);        }        String pad(String s, int left, int len) { if (len == s.length())     return s; char[] result = new char[len]; Arrays.fill(result, ' '); s.getChars(0, s.length(), result, left); return new String(result);        }        public List<String> finish() { String cur = toString(); if (left == null) {     return Collections.singletonList(cur); } List<String> l = left.finish(); List<String> r = right.finish(); int len1 = l.get(0).length(); int len2 = r.get(0).length(); int totalLen = len1 + len2 + 1; int leftAdd = 0; if (cur.length() < totalLen) {     cur = pad(cur, (totalLen - cur.length()) / 2, totalLen); } else {     leftAdd = (cur.length() - totalLen) / 2;     totalLen = cur.length(); } List<String> result = new ArrayList<>(); result.add(cur); char[] dashes = new char[totalLen]; Arrays.fill(dashes, ' '); Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1         + leftAdd, '_'); int mid = totalLen / 2; dashes[mid] = '/'; dashes[mid + 1] = '\'; result.add(new String(dashes)); Arrays.fill(dashes, ' '); dashes[len1 / 2 + leftAdd] = '|'; dashes[len1 + len2 / 2 + 1 + leftAdd] = '|'; result.add(new String(dashes)); int maxSize = Math.max(l.size(), r.size()); for (int i = 0; i < maxSize; i++) {     String lstr = l.size() > i ? l.get(i) : String.format("%"  + len1 + "s", "");     String rstr = r.size() > i ? r.get(i) : String.format("%"  + len2 + "s", "");     result.add(pad(lstr + " " + rstr, leftAdd, totalLen)); } return result;        }        private Object first() { if(left==null) return first; Object o=left.first(); if(o==EMPTY) o=right.first(); return o;        }        private Object last() { if(right==null) return last; Object o=right.last(); if(o==EMPTY) o=left.last(); return o;        }        public String toString() { Object o=first(), p=last(); return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+']': "]");        }    }    Node<E> n=new Node<>(src.spliterator(), spc);    Sp sp=(Sp)Proxy.newProxyInstance(null, new Class<?>[]{n.type}, n);    return f.apply(sp, true).onClose(n);}

它允许用代理程序包装拆分器,该代理器将监视拆分操作和遇到的对象。块处理的逻辑类似于Tagir的逻辑,实际上,我复制了他的结果打印例程。

您可以传入流的源或已附加相同操作的流。(在后一种情况下,您应

.parallel()
及早申请该流)。正如Tagir解释的那样,在大多数情况下,拆分行为取决于源和配置的并行度,因此,在大多数情况下,监视中间状态可能会更改值,但不会更改已处理的块:

try(IntStream is=proxy(IntStream.range(0, 100).parallel())) {    is.filter(i -> i/20%2==0)      .mapToObj(ix->"""+ix+'"')      .forEach(s->{});}

将打印

[0..99]   ___________________________________/________________________________    |   |          [0..49]        [50..99]     _________________/______________    _________________/________________ ||  |  |       [0..24]     [25..49]      [50..74]      [75..99]________/_____        ________/_______        ________/_______        ________/_______     |    |      |      |      |      |      |      |[0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]         ___/_          ___/___          ___/___          ___/___          ___/___          ___/___          ___/___          ___/___       |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |   [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

try(Stream<String> s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0)      .mapToObj(ix->"""+ix+'"'))) {    s.forEach(str->{});}

将打印

      ["0".."99"]         ___________________________________________/___________________________________________ ||     ["0".."49"] ["50".."99"]        ____________________/______________________          ______________________/___________________   ||        |          |       ["0".."19"] ["40".."49"]        ["50".."59"]          ["80".."99"]       ____________/_____________________/______     _______/___________        ____________/________  | |         |         |   |         |      ||      ["0".."11"]  ["12".."19"] (empty)["40".."49"]   ["50".."59"] (empty)        ["80".."86"]["87".."99"]  _____/___   _____/________/__ _____/_____   _____/________/__         _____/__   _____/_____|          | | |         |       |          | | | |         |       |       |         | | |     ["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"]

正如我们在此处看到的那样,我们正在监视的结果,

.filter(…).mapToObj(…)
但是块显然是由源确定的,可能会根据过滤器的条件在下游产生空块。

请注意,我们可以将源代码监视与Tagir的收集器监视结合起来:

try(IntStream s=proxy(IntStream.range(0, 100))) {    s.parallel().filter(i -> i/20%2==0)     .boxed().collect(parallelVisualize())     .forEach(System.out::println);}

这将打印(请注意,

collect
输出首先打印):

       [0..99]         ________________________________/_______________________________|          |      [0..49]    [50..99]  ________________/______________  _______________/_______________          |          ||          |     [0..19]    [40..49]    [50..59]   [80..99]         ________/_____       ________/______        _______/_______     ________/_____     |    |     |     |      |     |   |    |[0..11]         [12..19]          (empty)         [40..49]          [50..59]          (empty)       [80..86]        [87..99]         ___/_          ___/___         ___/__          ___/___          ___/___         ___/__        ___/_          ___/___       |      |        |        |       |       |        |        |        |        |       |       |      |      |        |        |   [0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99][0..99]   ___________________________________/________________________________    |   |          [0..49]        [50..99]     _________________/______________    _________________/________________ ||  |  |       [0..24]     [25..49]      [50..74]      [75..99]________/_____        ________/_______        ________/_______        ________/_______     |    |      |      |      |      |      |      |[0..11]         [12..24]          [25..36]          [37..49]          [50..61]          [62..74]          [75..86]          [87..99]         ___/_          ___/___          ___/___          ___/___          ___/___          ___/___          ___/___          ___/___       |      |        |        |        |        |        |        |        |        |        |        |        |        |        |        |   [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

我们可以清楚地看到处理的块是如何匹配的,但是在过滤之后,一些块的元素较少,其中一些完全为空。

这是演示的地方,两种监视方式可以在其中发挥重要作用:

try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) {    is.boxed()      .collect(parallelVisualize())      .forEach(System.out::println);}        [0.0..99.0]     ___________________________________________________/________________________________________________   |  |       [0.0..49.0]   [50.0..99.0]     _________________________/______________________         _________________________/________________________      |     |       |       |          [0.0..24.0]      [25.0..49.0]       [50.0..74.0]       [75.0..99.0]          ____________/_________     ____________/___________     ____________/___________     ____________/___________  | |   |   |   |   |   |   |      [0.0..11.0]  [12.0..24.0]   [25.0..36.0]   [37.0..49.0]   [50.0..61.0]   [62.0..74.0]   [75.0..86.0]   [87.0..99.0]  _____/___   _____/_____   _____/_____   _____/_____   _____/_____   _____/_____   _____/_____   _____/_____|          | | | | | | | | | | | | | | |     [0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0]       [0.0..10239.0]    _____________________________/_____   |   |       [0.0..1023.0][1024.0..10239.0]____________________/_______          |       | [1024.0..3071.0]  [3072.0..10239.0]         ____________/______        |         |          [3072.0..6143.0]     [6144.0..10239.0]      ___/_______     | |       [6144.0..10239.0] (empty)

这表明Tagir已经解释了什么,大小未知的流分配不佳,甚至事实

limit(…)
提供了进行良好估计的可能性(实际上,无限+极限在理论上是可预测的),实现没有利用它。

使用的批量大小将源拆分为多个块

1024
1024
每次拆分后将其增加,从而在施加的范围之外创建块
limit
。我们还可以看到每次如何分割前缀。

但是,当我们查看终端拆分输出时,我们可以看到这些多余的块之间已被丢弃,并且第一个块又发生了拆分。由于此块是由中间数组后端组成的,该中间数组已在第一次拆分时由默认实现填充,因此我们在源头没有注意到它,但是我们可以在终端操作中看到此数组已拆分(不足为奇)平衡。

因此,我们需要两种监视方式才能在此处获得完整的图像…



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/495966.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号