蟒蛇
无法修改单个嵌套字段。您必须重新创建一个整体结构。在这种特殊情况下,最简单的解决方案是使用
cast。
首先是一堆进口商品:
from collections import namedtuplefrom pyspark.sql.functions import colfrom pyspark.sql.types import ( ArrayType, LongType, StringType, StructField, StructType)
和示例数据:
Record = namedtuple("Record", ["a", "b", "c"])df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])让我们确认模式与您的情况相同:
df.printSchema()root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true)
您可以将新模式定义为例如字符串:
str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"df.select(col("array_field").cast(str_schema)).printSchema()root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a_renamed: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true)或
DataType:
struct_schema = ArrayType(StructType([ StructField("a_renamed", StringType()), StructField("b", LongType()), StructField("c", LongType())])) df.select(col("array_field").cast(struct_schema)).printSchema()root |-- array_field: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a_renamed: string (nullable = true) | | |-- b: long (nullable = true) | | |-- c: long (nullable = true)斯卡拉
可以在Scala中使用相同的技术:
case class Record(a: String, b: Long, c: Long)val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"df.select($"array_field".cast(strSchema))要么
import org.apache.spark.sql.types._val structSchema = ArrayType(StructType(Seq( StructField("a_renamed", StringType), StructField("b", LongType), StructField("c", LongType))))df.select($"array_field".cast(structSchema))可能的改进 :
如果您使用表现力的数据操作或JSON处理库,则将数据类型转储到
dictJSON字符串并从那里获取数据会更容易,例如(Python /
toolz):
from toolz.curried import pipe, assoc_in, update_in, mapfrom operator import attrgetter# Update name to "a_updated" if name is "a"rename_field = update_in( keys=["name"], func=lambda x: "a_updated" if x == "a" else x)updated_schema = pipe( # Get schema of the field as a dict df.schema["array_field"].jsonValue(), # Update fields with rename update_in( keys=["type", "elementType", "fields"], func=lambda x: pipe(x, map(rename_field), list)), # Load schema from dict StructField.fromJson, # Get data type attrgetter("dataType"))df.select(col("array_field").cast(updated_schema)).printSchema()


