栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Reader#lines()由于其拆分器中不可配置的批量大小策略而严重并行化

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Reader#lines()由于其拆分器中不可配置的批量大小策略而严重并行化

这是的答案,在的源代码中有详细说明

Spliterators.IteratorSpliterator
,供以下人员使用
BufferedReader#lines()

    @Override    public Spliterator<T> trySplit() {                Iterator<? extends T> i;        long s;        if ((i = it) == null) { i = it = collection.iterator(); s = est = (long) collection.size();        }        else s = est;        if (s > 1 && i.hasNext()) { int n = batch + BATCH_UNIT; if (n > s)     n = (int) s; if (n > MAX_BATCH)     n = MAX_BATCH; Object[] a = new Object[n]; int j = 0; do { a[j] = i.next(); } while (++j < n && i.hasNext()); batch = j; if (est != Long.MAX_VALUE)     est -= j; return new ArraySpliterator<>(a, 0, j, characteristics);        }        return null;    }

还值得注意的是常量:

static final int BATCH_UNIT = 1 << 10;  // batch array size incrementstatic final int MAX_BATCH = 1 << 25;  // max batch array size;

因此,在我使用6,000个元素的示例中,由于批次大小步长为1024,因此我只能得到三个批次。这可以准确地解释我的观察结果:最初使用了三个核心,然后减少为两个,然后在较小的批次完成时使用一个。同时,我尝试了一个包含60,000个元素的修改示例,然后获得了几乎100%的CPU利用率。

为了解决我的问题,我开发了下面的代码,该代码使我可以将任何现有流转换为一个流,然后

Spliterator#trySplit
将其划分为指定大小的批处理。从我的问题中将其用于用例的最简单方法是这样的:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)

在较低的级别上,下面的类是分隔器包装器,它可以更改包装的分隔器的

trySplit
行为,并使其他方面保持不变。


import static java.util.Spliterators.spliterator;import static java.util.stream.StreamSupport.stream;import java.util.Comparator;import java.util.Spliterator;import java.util.function.Consumer;import java.util.stream.Stream;public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {  private final Spliterator<T> spliterator;  private final int batchSize;  private final int characteristics;  private long est;  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {    final int c = toWrap.characteristics();    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;    this.spliterator = toWrap;    this.est = est;    this.batchSize = batchSize;  }  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {    this(toWrap, toWrap.estimateSize(), batchSize);  }  public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);  }  @Override public Spliterator<T> trySplit() {    final HoldingConsumer<T> holder = new HoldingConsumer<>();    if (!spliterator.tryAdvance(holder)) return null;    final Object[] a = new Object[batchSize];    int j = 0;    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));    if (est != Long.MAX_VALUE) est -= j;    return spliterator(a, 0, j, characteristics());  }  @Override public boolean tryAdvance(Consumer<? super T> action) {    return spliterator.tryAdvance(action);  }  @Override public void forEachRemaining(Consumer<? super T> action) {    spliterator.forEachRemaining(action);  }  @Override public Comparator<? super T> getComparator() {    if (hasCharacteristics(SORTED)) return null;    throw new IllegalStateException();  }  @Override public long estimateSize() { return est; }  @Override public int characteristics() { return characteristics; }  static final class HoldingConsumer<T> implements Consumer<T> {    Object value;    @Override public void accept(T value) { this.value = value; }  }}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/407044.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号