栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark DataFrame 添加列总结

Spark DataFrame 添加列总结

我的原创地址:https://dongkelun.com/2021/05/19/localSparkHiveWithKerberos/

前言

因添加列在平时可能会经常用到,但是长时间不用,可能会忘记应该用哪个函数,这样再重新查找比较耽误时间,于是总结代码进行备忘。主要总结:

  • 根据现有的列添加
  • 添加自增ID
  • 添加一列常量
  • 添加当前时间
  • 转换为timestamp类型
  • 转换为date类型
代码
package com.dkl.blog.spark.df

import java.util.Date

import org.apache.commons.lang.time.DateFormatUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}


object DfAddCols {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("DeltaLakeDemo")
      .master("local")
      .getOrCreate()

    val df = spark.range(0, 5).repartition(2)
      .withColumn("new_col", col("id") + 1) //根据现有的列添加
      .withColumn("uuid", monotonically_increasing_id) //自带函数添加自增ID,分区不连续,分区内连续
      .withColumn("year", lit("2021")) //添加一列常量,主要用lit函数
      .withColumn("time", lit(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))) //添加当前时间
      .withColumn("timestamp", lit("2021-06-16").cast("timestamp")) //转换为timestamp类型
      .withColumn("date", lit("2021-06-16").cast("date")) //转换为date类型

    df.printSchema()

    df.show()

    //用zipWithIndex重建DF,分区连续
    val rows = df.rdd.zipWithIndex.map { case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq) }
    val dfWithPK = spark.createDataframe(rows, StructType(StructField("pk", LongType, false) +: df.schema.fields))

    //用zipWithUniqueId重建DF
    val rows_2 = dfWithPK.rdd.zipWithUniqueId.map { case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq) }
    val dfWithPK_2 = spark.createDataframe(rows_2, StructType(StructField("pk_2", LongType, false) +: dfWithPK.schema.fields))

    dfWithPK_2.show()

    //通过窗口函数排序
    val w = Window.orderBy("id")
    dfWithPK_2.repartition(2).withColumn("pk_3", row_number().over(w)).show()

    spark.stop()
  }
}

运行结果
 |-- id: long (nullable = false)
 |-- new_col: long (nullable = false)
 |-- uuid: long (nullable = false)
 |-- year: string (nullable = false)
 |-- time: string (nullable = false)
 |-- timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)

+---+-------+----------+----+-------------------+-------------------+----------+
| id|new_col|      uuid|year|               time|          timestamp|      date|
+---+-------+----------+----+-------------------+-------------------+----------+
|  0|      1|         0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|  2|      3|         1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|  4|      5|         2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|  1|      2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|  3|      4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
+---+-------+----------+----+-------------------+-------------------+----------+

+----+---+---+-------+----------+----+-------------------+-------------------+----------+
|pk_2| pk| id|new_col|      uuid|year|               time|          timestamp|      date|
+----+---+---+-------+----------+----+-------------------+-------------------+----------+
|   0|  0|  0|      1|         0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|   2|  1|  2|      3|         1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|   4|  2|  4|      5|         2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|   1|  3|  1|      2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|   3|  4|  3|      4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
+----+---+---+-------+----------+----+-------------------+-------------------+----------+

21/06/16 11:32:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+---+---+-------+----------+----+-------------------+-------------------+----------+----+
|pk_2| pk| id|new_col|      uuid|year|               time|          timestamp|      date|pk_3|
+----+---+---+-------+----------+----+-------------------+-------------------+----------+----+
|   0|  0|  0|      1|         0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   1|
|   1|  3|  1|      2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   2|
|   2|  1|  2|      3|         1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   3|
|   3|  4|  3|      4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   4|
|   4|  2|  4|      5|         2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   5|
+----+---+---+-------+----------+----+-------------------+-------------------+----------+----+


UDF

也可以使用自定义函数添加新列,具体可以参考Spark UDF使用详解及代码示例,各自的优劣可以自己总结

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651865.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号