一般来说,这不是您通常在Spark中执行的操作。通常,我们尝试将通过驱动程序传递的数据量限制为最小。有两个主要原因:
- 将数据传递给Spark驱动程序很容易成为应用程序的瓶颈。
- 驱动程序实际上是批处理应用程序中的单点故障。
在正常情况下,您只需要继续工作,写入持久性存储,最后对结果应用进一步的处理步骤。
如果您希望能够迭代访问结果,则可以选择以下几种方法:
- 使用Spark Streaming。创建一个简单的过程,将数据推送到群集,然后收集每个批次。它简单,可靠,经过测试,并且不需要任何其他基础结构。
- 使用
foreach
/处理数据foreachPartition
,并在产生数据时将数据推送到外部消息传递系统,并使用另一个过程进行消耗和写入。这需要额外的组件,但从概念上讲可能更容易(您可以使用背压,缓冲结果,从驱动程序分离合并逻辑以最大程度地减少应用程序失败的风险)。 - Hack Spark累加器。任务完成后,火花累积器会更新,因此您可以分批处理累积的即将到来的数据。
警告 : 以下代码仅是概念验证。 它没有经过适当的测试,很可能是非常不可靠的。
AccumulatorParam使用RXPy的示例
# results_param.pyfrom rx.subjects import Subjectfrom pyspark import AccumulatorParam, TaskContextclass ResultsParam(AccumulatorParam, Subject): """An observable accumulator which collects task results""" def zero(self, v): return [] def addInPlace(self, acc1, acc2): # This is executed on the workers so we have to # merge the results if (TaskContext.get() is not None and TaskContext().get().partitionId() is not None): acc1.extend(acc2) return acc1 else: # This is executed on the driver so we discard the results # and publish to self instead for x in acc2: self.on_next(x) return []
简单的Spark应用程序(Python 3.x):
# main.pyimport timefrom pyspark import SparkContext, TaskContextsc = SparkContext(master="local[4]")sc.addPyFile("results_param.py")from results_param import ResultsParam# Define accumulatoracc = sc.accumulator([], ResultsParam())# Dummy subscriber acc.accum_param.subscribe(print)def process(x): """Identity proccess""" result = x acc.add([result]) # Add some delay time.sleep(5) return resultsc.parallelize(range(32), 8).foreach(process)这相对简单,但是如果多个任务同时完成,则有使驱动程序不堪重负的风险,因此您必须大幅超额分配驱动程序资源(成比例地达到并行度和任务结果的预期大小)。
runJob
直接使用Scala (不支持Python)。
实际上,Spark实际上是异步获取结果的,只要您不关心顺序,就不需要等待所有数据被处理。例如,
reduce您可以查看实现Scala。
应该可以使用这种机制将分区推到Python进程中,但是我还没有尝试过。



