火花 > = 2.3,> = 3.0
由于
OneHotEnprer不推荐使用Spark
2.3,而推荐使用
OneHotEnprerEstimator。如果您使用的是最新版本,请修改
enprer代码
from pyspark.ml.feature import oneHotEnprerEstimatorenprer = oneHotEnprerEstimator( inputCols=["gender_numeric"], outputCols=["gender_vector"])
在Spark 3.0中,此变体已重命名为
OneHotEnprer:
from pyspark.ml.feature import oneHotEnprerenprer = oneHotEnprer( inputCols=["gender_numeric"], outputCols=["gender_vector"])
另外
StringIndexer已扩展为支持多个输入列:
StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])
火花 <2.3
好吧,您可以编写一个UDF,但是为什么呢?已经有很多工具可以处理此类任务:
from pyspark.sql import Rowfrom pyspark.ml.linalg import DenseVectorrow = Row("gender", "foo", "bar")df = sc.parallelize([ row("0", 3.0, DenseVector([0, 2.1, 1.0])), row("1", 1.0, DenseVector([0, 1.1, 1.0])), row("1", -1.0, DenseVector([0, 3.4, 0.0])), row("0", -3.0, DenseVector([0, 4.1, 0.0]))]).toDF()首先
StringIndexer。
from pyspark.ml.feature import StringIndexerindexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)indexed_df = indexer.transform(df)indexed_df.drop("bar").show()## +------+----+--------------+## |gender| foo|gender_numeric|## +------+----+--------------+## | 0| 3.0|0.0|## | 1| 1.0|1.0|## | 1|-1.0|1.0|## | 0|-3.0|0.0|## +------+----+--------------+下一个
OneHotEnprer:
from pyspark.ml.feature import oneHotEnprerenprer = oneHotEnprer(inputCol="gender_numeric", outputCol="gender_vector")enpred_df = enprer.transform(indexed_df)enpred_df.drop("bar").show()## +------+----+--------------+-------------+## |gender| foo|gender_numeric|gender_vector|## +------+----+--------------+-------------+## | 0| 3.0|0.0|(1,[0],[1.0])|## | 1| 1.0|1.0| (1,[],[])|## | 1|-1.0|1.0| (1,[],[])|## | 0|-3.0|0.0|(1,[0],[1.0])|## +------+----+--------------+-------------+VectorAssembler:
from pyspark.ml.feature import VectorAssemblerassembler = VectorAssembler( inputCols=["gender_vector", "bar", "foo"], outputCol="features")enpred_df_with_indexed_bar = (vector_indexer .fit(enpred_df) .transform(enpred_df))final_df = assembler.transform(enpred_df)
如果
bar包含分类变量,则可以
VectorIndexer用来设置所需的元数据:
from pyspark.ml.feature import VectorIndexervector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")
但事实并非如此。
最后,您可以使用管道包装所有内容:
from pyspark.ml import Pipelinepipeline = Pipeline(stages=[indexer, enprer, vector_indexer, assembler])model = pipeline.fit(df)transformed = model.transform(df)
可以说,与从头开始编写所有内容相比,这是一种更加健壮和简洁的方法。有一些警告,特别是当您需要不同数据集之间的一致编码时。你可以阅读更多的官方文档中的
StringIndexer和
VectorIndexer。
获得相当的输出的另一种方式是
RFormula其中:
RFormula产生要素的向量列和标签的双列或字符串列。就像在R中使用公式进行线性回归时一样,字符串输入列将被一键编码,数字列将被转换为双精度。如果标签列的类型为字符串,则首先将其转换为doubleStringIndexer。如果Dataframe中不存在label列,则将从公式中指定的响应变量创建输出label列。
from pyspark.ml.feature import RFormularf = RFormula(formula="~ gender + bar + foo - 1")final_df_rf = rf.fit(df).transform(df)
如您所见,它更加简洁,但很难编写,因此无法进行大量自定义。不过,像这样的简单管道的结果将是相同的:
final_df_rf.select("features").show(4, False)## +----------------------+## |features |## +----------------------+## |[1.0,0.0,2.1,1.0,3.0] |## |[0.0,0.0,1.1,1.0,1.0] |## |(5,[2,4],[3.4,-1.0]) |## |[1.0,0.0,4.1,0.0,-3.0]|## +----------------------+final_df.select("features").show(4, False)## +----------------------+## |features |## +----------------------+## |[1.0,0.0,2.1,1.0,3.0] |## |[0.0,0.0,1.1,1.0,1.0] |## |(5,[2,4],[3.4,-1.0]) |## |[1.0,0.0,4.1,0.0,-3.0]|## +----------------------+关于您的问题:
制作具有与我可以在Spark SQL查询中使用的功能类似的UDF(我想可以通过其他方式)
它就像其他任何UDF一样。确保使用受支持的类型,除此之外,一切都应该正常工作。
从上述映射获取RDD并将其作为新列添加到user_data数据帧?
from pyspark.ml.linalg import VectorUDTfrom pyspark.sql.types import StructType, StructFieldschema = StructType([StructField("features", VectorUDT(), True)])row = Row("features")result.map(lambda x: row(DenseVector(x))).toDF(schema)注意事项 :
对于Spark 1.x,请替换
pyspark.ml.linalg为
pyspark.mllib.linalg。



