好吧,细节在于魔鬼。要了解发生这种情况的原因,我们将不得不仔细研究PySpark序列化器。首先让我们
SparkContext使用默认设置进行创建:
from pyspark import SparkContextsc = SparkContext("local", "foo")并检查什么是默认的序列化器:
sc.serializer## AutoBatchedSerializer(PickleSerializer())sc.serializer.bestSize## 65536
它告诉我们三件事:
- 这是
AutoBatchedSerializer
序列化器 - 它
PickleSerializer
用来执行实际的工作 bestSize
批处理的序列化的是65536字节
快速浏览一下源代码,您会发现此序列化调整了运行时时间序列化的记录数,并尝试使批大小保持小于10*
bestSize。重要的一点是,并非单个分区中的所有记录都同时被序列化。
我们可以通过以下实验进行检查:
from operator import addbd = sc.broadcast({})rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value)rdd.map(id).distinct().count()## 1rdd.cache().count()## 10rdd.map(id).distinct().count()## 2如您所见,即使在序列化-反序列化之后的这个简单示例中,我们也得到了两个不同的对象。您可以直接使用观察到类似的行为
pickle:
v = {}vs = [v, v, v, v]v1, *_, v4 = pickle.loads(pickle.dumps(vs))v1 is v4## True(v1_, v2_), (v3_, v4_) = ( pickle.loads(pickle.dumps(vs[:2])), pickle.loads(pickle.dumps(vs[2:])))v1_ is v4_## Falsev3_ is v4_## True取消酸洗后,在同一批引用中序列化的值是同一对象。来自不同批次的值指向不同的对象。
在实践中,Spark具有多个序列化和不同的序列化策略。例如,您可以使用无限大小的批次:
from pyspark.serializers import BatchedSerializer, PickleSerializerrdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value) ._reserialize(BatchedSerializer(PickleSerializer())))rdd_.cache().count()rdd_.map(id).distinct().count()## 1
您可以通过将
serializer和/或
batchSize参数传递给
SparkContext构造函数来更改序列化程序:
sc = SparkContext( "local", "bar", serializer=PickleSerializer(), # Default serializer # Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer batchSize=-1 )sc.serializer## BatchedSerializer(PickleSerializer(), -1)
选择不同的序列化程序和批处理策略会导致不同的权衡(速度,序列化任意对象的能力,内存要求等)。
您还应该记住,Spark中的广播变量不会在执行程序线程之间共享,因此同一工作线程可以同时存在多个反序列化副本。
此外,如果执行需要改组的转换,您将看到与此类似的行为。



