栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

有没有一种方法可以将结果流传输到驱动程序,而无需等待所有分区完成执行?

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

有没有一种方法可以将结果流传输到驱动程序,而无需等待所有分区完成执行?

一般来说,这不是您通常在Spark中执行的操作。通常,我们尝试将通过驱动程序传递的数据量限制为最小。有两个主要原因:

  • 将数据传递给Spark驱动程序很容易成为应用程序的瓶颈。
  • 驱动程序实际上是批处理应用程序中的单点故障。

在正常情况下,您只需要继续工作,写入持久性存储,最后对结果应用进一步的处理步骤。

如果您希望能够迭代访问结果,则可以选择以下几种方法:

  • 使用Spark Streaming。创建一个简单的过程,将数据推送到群集,然后收集每个批次。它简单,可靠,经过测试,并且不需要任何其他基础结构。
  • 使用
    foreach
    /处理数据
    foreachPartition
    ,并在产生数据时将数据推送到外部消息传递系统,并使用另一个过程进行消耗和写入。这需要额外的组件,但从概念上讲可能更容易(您可以使用背压,缓冲结果,从驱动程序分离合并逻辑以最大程度地减少应用程序失败的风险)。
  • Hack Spark累加器。任务完成后,火花累积器会更新,因此您可以分批处理累积的即将到来的数据。

警告以下代码仅是概念验证。 它没有经过适当的测试,很可能是非常不可靠的

AccumulatorParam
使用RXPy的示例

    # results_param.pyfrom rx.subjects import Subjectfrom pyspark import AccumulatorParam, TaskContextclass ResultsParam(AccumulatorParam, Subject):    """An observable accumulator which collects task results"""    def zero(self, v):        return []    def addInPlace(self, acc1, acc2):        # This is executed on the workers so we have to        # merge the results        if (TaskContext.get() is not None and      TaskContext().get().partitionId() is not None): acc1.extend(acc2) return acc1        else: # This is executed on the driver so we discard the results # and publish to self instead for x in acc2:     self.on_next(x) return []

简单的Spark应用程序(Python 3.x):

    # main.pyimport timefrom pyspark import SparkContext, TaskContextsc = SparkContext(master="local[4]")sc.addPyFile("results_param.py")from results_param import ResultsParam# Define accumulatoracc = sc.accumulator([], ResultsParam())# Dummy subscriber acc.accum_param.subscribe(print)def process(x):    """Identity proccess"""    result = x    acc.add([result])    # Add some delay    time.sleep(5)    return resultsc.parallelize(range(32), 8).foreach(process)

这相对简单,但是如果多个任务同时完成,则有使驱动程序不堪重负的风险,因此您必须大幅超额分配驱动程序资源(成比例地达到并行度和任务结果的预期大小)。

  • runJob
    直接使用Scala (不支持Python)。

实际上,Spark实际上是异步获取结果的,只要您不关心顺序,就不需要等待所有数据被处理。例如,

reduce
您可以查看实现Scala。

应该可以使用这种机制将分区推到Python进程中,但是我还没有尝试过。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/646166.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号