我的原创地址:https://dongkelun.com/2021/05/19/localSparkHiveWithKerberos/
前言因添加列在平时可能会经常用到,但是长时间不用,可能会忘记应该用哪个函数,这样再重新查找比较耽误时间,于是总结代码进行备忘。主要总结:
- 根据现有的列添加
- 添加自增ID
- 添加一列常量
- 添加当前时间
- 转换为timestamp类型
- 转换为date类型
package com.dkl.blog.spark.df
import java.util.Date
import org.apache.commons.lang.time.DateFormatUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
object DfAddCols {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DeltaLakeDemo")
.master("local")
.getOrCreate()
val df = spark.range(0, 5).repartition(2)
.withColumn("new_col", col("id") + 1) //根据现有的列添加
.withColumn("uuid", monotonically_increasing_id) //自带函数添加自增ID,分区不连续,分区内连续
.withColumn("year", lit("2021")) //添加一列常量,主要用lit函数
.withColumn("time", lit(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))) //添加当前时间
.withColumn("timestamp", lit("2021-06-16").cast("timestamp")) //转换为timestamp类型
.withColumn("date", lit("2021-06-16").cast("date")) //转换为date类型
df.printSchema()
df.show()
//用zipWithIndex重建DF,分区连续
val rows = df.rdd.zipWithIndex.map { case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq) }
val dfWithPK = spark.createDataframe(rows, StructType(StructField("pk", LongType, false) +: df.schema.fields))
//用zipWithUniqueId重建DF
val rows_2 = dfWithPK.rdd.zipWithUniqueId.map { case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq) }
val dfWithPK_2 = spark.createDataframe(rows_2, StructType(StructField("pk_2", LongType, false) +: dfWithPK.schema.fields))
dfWithPK_2.show()
//通过窗口函数排序
val w = Window.orderBy("id")
dfWithPK_2.repartition(2).withColumn("pk_3", row_number().over(w)).show()
spark.stop()
}
}
运行结果
|-- id: long (nullable = false) |-- new_col: long (nullable = false) |-- uuid: long (nullable = false) |-- year: string (nullable = false) |-- time: string (nullable = false) |-- timestamp: timestamp (nullable = true) |-- date: date (nullable = true) +---+-------+----------+----+-------------------+-------------------+----------+ | id|new_col| uuid|year| time| timestamp| date| +---+-------+----------+----+-------------------+-------------------+----------+ | 0| 1| 0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 2| 3| 1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 4| 5| 2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 1| 2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 3| 4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| +---+-------+----------+----+-------------------+-------------------+----------+ +----+---+---+-------+----------+----+-------------------+-------------------+----------+ |pk_2| pk| id|new_col| uuid|year| time| timestamp| date| +----+---+---+-------+----------+----+-------------------+-------------------+----------+ | 0| 0| 0| 1| 0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 2| 1| 2| 3| 1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 4| 2| 4| 5| 2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 1| 3| 1| 2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 3| 4| 3| 4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| +----+---+---+-------+----------+----+-------------------+-------------------+----------+ 21/06/16 11:32:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. +----+---+---+-------+----------+----+-------------------+-------------------+----------+----+ |pk_2| pk| id|new_col| uuid|year| time| timestamp| date|pk_3| +----+---+---+-------+----------+----+-------------------+-------------------+----------+----+ | 0| 0| 0| 1| 0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 1| | 1| 3| 1| 2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 2| | 2| 1| 2| 3| 1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 3| | 3| 4| 3| 4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 4| | 4| 2| 4| 5| 2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 5| +----+---+---+-------+----------+----+-------------------+-------------------+----------+----+UDF
也可以使用自定义函数添加新列,具体可以参考Spark UDF使用详解及代码示例,各自的优劣可以自己总结



