栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Pyspark:将多个数组列拆分为行

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Pyspark:将多个数组列拆分为行

火花 > = 2.4

您可以替换

zip_
udf
arrays_zip
功能

from pyspark.sql.functions import arrays_zip, col, explode(df    .withColumn("tmp", arrays_zip("b", "c"))    .withColumn("tmp", explode("tmp"))    .select("a", col("tmp.b"), col("tmp.c"), "d"))

火花 <2.4

Dataframes
和UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerTypefrom pyspark.sql.functions import col, udf, explodezip_ = udf(  lambda x, y: list(zip(x, y)),  ArrayType(StructType([      # Adjust types to reflect data types      StructField("first", IntegerType()),      StructField("second", IntegerType())  ])))(df    .withColumn("tmp", zip_("b", "c"))    # UDF output cannot be directly passed to explode    .withColumn("tmp", explode("tmp"))    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

RDDs

(df    .rdd    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])    .toDF(["a", "b", "c", "d"]))

由于Python的通讯开销,这两种解决方案的效率都不高。如果数据大小固定,则可以执行以下操作:

from functools import reducefrom pyspark.sql import Dataframe# Length of arrayn = 3# For legacy Python you'll need a separate function# in place of method accessor reduce(    Dataframe.unionAll,     (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")        for i in range(n))).toDF("a", "b", "c", "d")

甚至:

from pyspark.sql.functions import array, struct# SQL level zip of arrays of known size# followed by explodetmp = explode(array(*[    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))    for i in range(n)]))(df    .withColumn("tmp", tmp)    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

与UDF或RDD相比,这应该明显更快。通用化以支持任意数量的列:

# This uses keyword only arguments# If you use legacy Python you'll have to change signature# Body of the function can stay the samedef zip_and_explode(*colnames, n):    return explode(array(*[        struct(*[col(c).getItem(i).alias(c) for c in colnames])        for i in range(n)    ]))df.withColumn("tmp", zip_and_explode("b", "c", n=3))


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/574874.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号