栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

使用Spark将列转置为行

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用Spark将列转置为行

使用基本的Spark SQL函数相对简单。

蟒蛇

from pyspark.sql.functions import array, col, explode, struct, litdf = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])def to_long(df, by):    # Filter dtypes and split into column names and type description    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))    # Spark SQL supports only homogeneous columns    assert len(set(dtypes)) == 1, "All columns have to be of the same type"    # Create and explode an array of (column_name, column_value) structs    kvs = explode(array([      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols    ])).alias("kvs")    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])to_long(df, ["A"])

Scala

import org.apache.spark.sql.Dataframeimport org.apache.spark.sql.functions.{array, col, explode, lit, struct}val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")def toLong(df: Dataframe, by: Seq[String]): Dataframe = {  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")  val kvs = explode(array(    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*  ))  val byExprs = by.map(col(_))  df    .select(byExprs :+ kvs.alias("_kvs"): _*)    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)}toLong(df, Seq("A"))


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/641459.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号