就我个人而言,我会使用Python UDF,并且不会打扰其他任何事情:
Vectors
不是本机SQL类型,因此会以某种方式增加性能开销。特别是,此过程需要两个步骤,其中首先使用通用将数据从外部类型转换为行,然后从行转换为内部表示RowEnprer
。- 任何下游ML
Pipeline
都将比简单转换昂贵得多。而且,它需要与上述相反的过程
但是,如果您真的想要其他选择,您可以:
- 带有Python包装器的Scala UDF:
按照项目站点上的说明安装sbt。
使用以下结构创建Scala软件包:
.├── build.sbt└── udfs.scala
编辑
build.sbt(调整以反映Scala和Spark版本):
scalaVersion := "2.11.8"libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.4.4", "org.apache.spark" %% "spark-mllib" % "2.4.4")
编辑
udfs.scala:
package com.example.spark.udfsimport org.apache.spark.sql.functions.udfimport org.apache.spark.ml.linalg.DenseVectorobject udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))}包:
sbt package
并包含(或等效项,具体取决于Scala版本):
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
作为
--driver-class-path启动外壳程序/提交应用程序时的参数。
在PySpark中定义一个包装器:
from pyspark.sql.column import _to_java_column, _to_seq, Columnfrom pyspark import SparkContextdef as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
测试:
with_vec = df.withColumn("vector", as_vector("temperatures"))with_vec.show() +--------+------------------+----------------+| city| temperatures| vector|+--------+------------------+----------------+| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]||New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|+--------+------------------+----------------+with_vec.printSchema() root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true)将数据转储为反映
DenseVector
架构的JSON格式并读回:from pyspark.sql.functions import to_json, from_json, col, struct, lit
from pyspark.sql.types import StructType, StructField
from pyspark.ml.linalg import VectorUDTjson_vec = to_json(struct(struct(
lit(1).alias(“type”), # type 1 is dense, type 0 is sparse
col(“temperatures”).alias(“values”)
).alias(“v”)))schema = StructType([StructField(“v”, VectorUDT())])
with_parsed_vector = df.withColumn(
“parsed_vector”, from_json(json_vec, schema).getItem(“v”)
)with_parsed_vector.show()
+--------+------------------+----------------+
| city| temperatures| parsed_vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+with_parsed_vector.printSchema()root
|– city: string (nullable = true)
|– temperatures: array (nullable = true)
| |– element: double (containsNull = true)
|– parsed_vector: vector (nullable = true)



