Spark SQL的聚合函数中有first, last函数,从字面意思就是根据分组获取第一条和最后一条记录的值,实际上,只在local模式下,你可以得到满意的答案,但是在生产环境(分布式)时,这个是不能保证的。看源码的解释:
如何保证first, last是有效呢?表要排好序的,同时只能用一个分区处理,再用一个reducer来聚合。。。
所以,在多分区场景不能用first, last函数求得聚合的第一条和最后一条数据。
解决方案:利用Window。
val spark = SparkSession.builder().master("local").appName("Demo").getOrCreate()
import spark.implicits._
val df = Seq(("a", 10, 12345), ("a", 12, 34567), ("a", 11, 23456), ("b", 10, 55555), ("b", 8, 12348)).toDF("name", "value", "event_time")
// 定义window
val asc = Window.partitionBy("name").orderBy($"event_time")
val desc = Window.partitionBy("name").orderBy($"event_time".desc)
// 根据window生成row_number,根据row_number获取对应的数据
val firstValue = df.withColumn("rn", row_number().over(asc)).where($"rn" === 1).drop("rn")
val lastValue = df.withColumn("rn", row_number().over(desc)).where($"rn" === 1).drop("rn")
// 利用join把数据聚合一起
df.groupBy("name")
.count().as("t1")
.join(firstValue.as("fv"), "name")
.join(lastValue.as("lv"), "name")
.select($"t1.name", $"fv.value".as("first_value"), $"lv.value".as("last_value"), $"t1.count")
.show()
输出:
+----+-----------+----------+-----+ |name|first_value|last_value|count| +----+-----------+----------+-----+ | b| 8| 10| 2| | a| 10| 12| 3| +----+-----------+----------+-----+



