本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
目录本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
Spark SQL functions.scala 源码解析(一)Sort functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(二)Aggregate functions(基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(三)Window functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(四)Non-aggregate functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(五)Math Functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(六)Misc functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(七)String functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(八)DateTime functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(九)Collection functions (基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(十)Partition transform functions(基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(十一)Scala UDF functions(基于 Spark 3.3.0)
Spark SQL functions.scala 源码解析(十二)Java UDF functions(基于 Spark 3.3.0)
正文 md5
def md5(e: Column): Column = withExpr { Md5(e.expr) }
用法
========== df.select(md5($"a"), md5($"b")).show() ========== +--------------------+--------------------+ | md5(a)| md5(b)| +--------------------+--------------------+ |902fbdd2b1df0c4f7...|6ac1e56bc78f03105...| +--------------------+--------------------+sha1
def sha1(e: Column): Column = withExpr { Sha1(e.expr) }
用法
========== df.select(sha1($"a"), sha1($"b")).show() ========== +--------------------+--------------------+ | sha1(a)| sha1(b)| +--------------------+--------------------+ |3c01bdbb26f358bab...|5d211bad8f4ee70e1...| +--------------------+--------------------+sha2
def sha2(e: Column, numBits: Int): Column = {
require(Seq(0, 224, 256, 384, 512).contains(numBits),
s"numBits $numBits is not in the permitted values (0, 224, 256, 384, 512)")
withExpr { Sha2(e.expr, lit(numBits).expr) }
}
用法
========== df.select(sha2($"a", 256), sha2($"b", 256)).show() ========== +--------------------+--------------------+ | sha2(a, 256)| sha2(b, 256)| +--------------------+--------------------+ |b5d4045c3f466fa91...|7192385c3c0605de5...| +--------------------+--------------------+crc32
def crc32(e: Column): Column = withExpr { Crc32(e.expr) }
用法
========== df.select(crc32($"a"), crc32($"b")).show() ========== +----------+----------+ | crc32(a)| crc32(b)| +----------+----------+ |2743272264|2180413220| +----------+----------+hash
@scala.annotation.varargs
def hash(cols: Column*): Column = withExpr {
new Murmur3Hash(cols.map(_.expr))
}
用法
========== df.select(hash($"a"), hash($"b")).show() ========== +----------+----------+ | hash(a)| hash(b)| +----------+----------+ |-757602832|-707439997| +----------+----------+xxhash64
@scala.annotation.varargs
def xxhash64(cols: Column*): Column = withExpr {
new XxHash64(cols.map(_.expr))
}
用法
========== df.select(xxhash64($"a"), xxhash64($"b")).show() ========== +-------------------+-------------------+ | xxhash64(a)| xxhash64(b)| +-------------------+-------------------+ |4105715581806190027|6863832527287464158| +-------------------+-------------------+assert_true
def assert_true(c: Column): Column = withExpr {
new AssertTrue(c.expr)
}
def assert_true(c: Column, e: Column): Column = withExpr {
new AssertTrue(c.expr, e.expr)
}
用法
========== Seq((0, 1)).toDF("a", "b").select(assert_true($"a" < $"b")).show() ==========
+----------------------------------------------+
|assert_true((a < b), '('a < 'b)' is not true!)|
+----------------------------------------------+
| null|
+----------------------------------------------+
========== Seq((1, 0)).toDF("a", "b").select(assert_true($"a" < $"b", lit("a >= b"))).show() ==========
Exception in thread "main" java.lang.RuntimeException: a >= b
at org.apache.spark.sql.catalyst.expressions.RaiseError.eval(misc.scala:89)
at org.apache.spark.sql.catalyst.expressions.If.eval(conditionalexpressions.scala:71)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedexpressions.scala:168)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.$anonfun$applyOrElse$80(Optimizer.scala:1840)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1840)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1835)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:447)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1835)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1833)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
at com.shockang.study.spark.sql.functions.MiscFunctionsExample$.main(MiscFunctionsExample.scala:52)
at com.shockang.study.spark.sql.functions.MiscFunctionsExample.main(MiscFunctionsExample.scala)
raise_error
def raise_error(c: Column): Column = withExpr {
RaiseError(c.expr)
}
用法
========== df.select(raise_error(lit(null.asInstanceOf[String]))).show() ========== Exception in thread "main" java.lang.RuntimeException at org.apache.spark.sql.catalyst.expressions.RaiseError.eval(misc.scala:87) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedexpressions.scala:168) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.$anonfun$applyOrElse$80(Optimizer.scala:1840) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1840) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1835) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127) at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127) at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127) at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:447) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1835) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1833) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166) at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163) at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704) at org.apache.spark.sql.Dataset.head(Dataset.scala:2728) at org.apache.spark.sql.Dataset.take(Dataset.scala:2935) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287) at org.apache.spark.sql.Dataset.showString(Dataset.scala:326) at org.apache.spark.sql.Dataset.show(Dataset.scala:806) at org.apache.spark.sql.Dataset.show(Dataset.scala:765) at org.apache.spark.sql.Dataset.show(Dataset.scala:774) at com.shockang.study.spark.sql.functions.MiscFunctionsExample$.main(MiscFunctionsExample.scala:56) at com.shockang.study.spark.sql.functions.MiscFunctionsExample.main(MiscFunctionsExample.scala)实践 代码
package com.shockang.study.spark.sql.functions
import com.shockang.study.spark.util.Utils.formatPrint
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object MiscFunctionsExample {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession.builder().appName("MiscFunctionsExample").master("local[*]").getOrCreate()
import spark.implicits._
val df = Seq(("ABC", Array[Byte](1, 2, 3, 4, 5, 6))).toDF("a", "b")
// md5
formatPrint("""df.select(md5($"a"), md5($"b")).show()""")
df.select(md5($"a"), md5($"b")).show()
// sha1
formatPrint("""df.select(sha1($"a"), sha1($"b")).show()""")
df.select(sha1($"a"), sha1($"b")).show()
// sha2
formatPrint("""df.select(sha2($"a", 256), sha2($"b", 256)).show()""")
df.select(sha2($"a", 256), sha2($"b", 256)).show()
// crc32
formatPrint("""df.select(crc32($"a"), crc32($"b")).show()""")
df.select(crc32($"a"), crc32($"b")).show()
// hash
formatPrint("""df.select(hash($"a"), hash($"b")).show()""")
df.select(hash($"a"), hash($"b")).show()
// xxhash64
formatPrint("""df.select(xxhash64($"a"), xxhash64($"b")).show()""")
df.select(xxhash64($"a"), xxhash64($"b")).show()
// assert_true
formatPrint("""Seq((0, 1)).toDF("a", "b").select(assert_true($"a" < $"b")).show()""")
Seq((0, 1)).toDF("a", "b").select(assert_true($"a" < $"b")).show()
// 最后两个都会抛出异常中断执行
formatPrint("""Seq((1, 0)).toDF("a", "b").select(assert_true($"a" < $"b", lit("a >= b"))).show()""")
Seq((1, 0)).toDF("a", "b").select(assert_true($"a" < $"b", lit("a >= b"))).show()
// raise_error
formatPrint("""df.select(raise_error(lit(null.asInstanceOf[String]))).show()""")
df.select(raise_error(lit(null.asInstanceOf[String]))).show()
spark.stop()
}
}
输出
========== df.select(md5($"a"), md5($"b")).show() ==========
+--------------------+--------------------+
| md5(a)| md5(b)|
+--------------------+--------------------+
|902fbdd2b1df0c4f7...|6ac1e56bc78f03105...|
+--------------------+--------------------+
========== df.select(sha1($"a"), sha1($"b")).show() ==========
+--------------------+--------------------+
| sha1(a)| sha1(b)|
+--------------------+--------------------+
|3c01bdbb26f358bab...|5d211bad8f4ee70e1...|
+--------------------+--------------------+
========== df.select(sha2($"a", 256), sha2($"b", 256)).show() ==========
+--------------------+--------------------+
| sha2(a, 256)| sha2(b, 256)|
+--------------------+--------------------+
|b5d4045c3f466fa91...|7192385c3c0605de5...|
+--------------------+--------------------+
========== df.select(crc32($"a"), crc32($"b")).show() ==========
+----------+----------+
| crc32(a)| crc32(b)|
+----------+----------+
|2743272264|2180413220|
+----------+----------+
========== df.select(hash($"a"), hash($"b")).show() ==========
+----------+----------+
| hash(a)| hash(b)|
+----------+----------+
|-757602832|-707439997|
+----------+----------+
========== df.select(xxhash64($"a"), xxhash64($"b")).show() ==========
+-------------------+-------------------+
| xxhash64(a)| xxhash64(b)|
+-------------------+-------------------+
|4105715581806190027|6863832527287464158|
+-------------------+-------------------+
========== Seq((0, 1)).toDF("a", "b").select(assert_true($"a" < $"b")).show() ==========
+----------------------------------------------+
|assert_true((a < b), '('a < 'b)' is not true!)|
+----------------------------------------------+
| null|
+----------------------------------------------+
========== Seq((1, 0)).toDF("a", "b").select(assert_true($"a" < $"b", lit("a >= b"))).show() ==========
Exception in thread "main" java.lang.RuntimeException: a >= b
at org.apache.spark.sql.catalyst.expressions.RaiseError.eval(misc.scala:89)
at org.apache.spark.sql.catalyst.expressions.If.eval(conditionalexpressions.scala:71)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedexpressions.scala:168)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.$anonfun$applyOrElse$80(Optimizer.scala:1840)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1840)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1835)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:447)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1835)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1833)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
at com.shockang.study.spark.sql.functions.MiscFunctionsExample$.main(MiscFunctionsExample.scala:52)
at com.shockang.study.spark.sql.functions.MiscFunctionsExample.main(MiscFunctionsExample.scala)



