与使用 嵌套的RDD或在转换内部执行Spark操作 相比,这里的问题要微妙得多。Spark不允许访问
SparkContext内部操作或转换。
即使您没有显式访问它,也要在闭包内部对其进行引用,并且必须对其进行序列化和携带。这意味着
transformation引用的方法也会
self保留
SparkContext,因此会出现错误。
一种解决方法是使用静态方法:
class model(object): @staticmethod def transformation_function(row): row = row.split(',') return row[0]+row[1] def __init__(self): self.data = sc.textFile('some.csv') def run_model(self): self.data = self.data.map(model.transformation_function)编辑 :
如果您希望能够访问实例变量,可以尝试如下操作:
class model(object): @staticmethod def transformation_function(a_model): delim = a_model.delim def _transformation_function(row): return row.split(delim) return _transformation_function def __init__(self): self.delim = ',' self.data = sc.textFile('some.csv') def run_model(self): self.data = self.data.map(model.transformation_function(self))


