Pipeline或
PipelineModel有效
PipelineStages,并且这样可以合并为一个
Pipeline。例如:
from pyspark.ml import Pipelinefrom pyspark.ml.feature import VectorAssemblerdf = spark.createDataframe([ (1.0, 0, 1, 1, 0), (0.0, 1, 0, 0, 1)], ("label", "x1", "x2", "x3", "x4"))pipeline1 = Pipeline(stages=[ VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")])pipeline2 = Pipeline(stages=[ VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")])您可以结合使用
Pipelines:
Pipeline(stages=[ pipeline1, pipeline2, VectorAssembler(inputCols=["features1", "features2"], outputCol="features")]).fit(df).transform(df)+-----+---+---+---+---+---------+---------+-----------------+|label|x1 |x2 |x3 |x4 |features1|features2|features |+-----+---+---+---+---+---------+---------+-----------------+|1.0 |0 |1 |1 |0 |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]||0.0 |1 |0 |0 |1 |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|+-----+---+---+---+---+---------+---------+-----------------+
或预装
PipelineModels:
model1 = pipeline1.fit(df)model2 = pipeline2.fit(df)Pipeline(stages=[ model1, model2, VectorAssembler(inputCols=["features1", "features2"], outputCol="features")]).fit(df).transform(df)+-----+---+---+---+---+---------+---------+-----------------+|label| x1| x2| x3| x4|features1|features2| features|+-----+---+---+---+---+---------+---------+-----------------+| 1.0| 0| 1| 1| 0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|| 0.0| 1| 0| 0| 1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|+-----+---+---+---+---+---------+---------+-----------------+
因此,我建议的方法是先加入数据,并
fit和
transform全
Dataframe。
也可以看看:
- Apack Spark将新的拟合阶段添加到退出的PipelineModel中,而无需再次拟合



