让您更有趣一点,以便窗口中有一些事件:
val df = sc.parallelize(Seq( (1, "a", "2014-12-30", "2015-01-01", 100), (2, "a", "2014-12-21", "2015-01-02", 150), (3, "a", "2014-12-10", "2015-01-03", 120), (4, "b", "2014-12-05", "2015-01-01", 100))).toDF("id", "prodId", "dateIns", "dateTrans", "value").withColumn("dateIns", to_date($"dateIns")).withColumn("dateTrans", to_date($"dateTrans"))您所需要的或多或少是这样的:
import org.apache.spark.sql.functions.{col, datediff, lit, sum}// Find difference in tens of days val diff = (datediff(col("dateTrans"), col("dateIns")) / 10) .cast("integer") * 10val dfWithDiff = df.withColumn("diff", diff)val aggregated = dfWithDiff .where((col("diff") < 30) && (col("diff") >= 0)) .groupBy(col("prodId"), col("diff")) .agg(sum(col("value")))结果
aggregated.show// +------+----+----------+// |prodId|diff|sum(value)|// +------+----+----------+// | a| 20| 120|// | b| 20| 100|// | a| 0| 100|// | a| 10| 150|// +------+----+----------+
其中
diff是范围(0-> [0,10),10-> [10,20),…)的下限。如果您删除
val并调整了导入,这也将在PySpark中起作用。
编辑 (每列汇总):
val exprs = Seq(0, 10, 20).map(x => sum( when(col("diff") === lit(x), col("value")) .otherwise(lit(0))) .alias(x.toString))dfWithDiff.groupBy(col("prodId")).agg(exprs.head, exprs.tail: _*).show// +------+---+---+---+// |prodId| 0| 10| 20|// +------+---+---+---+// | a|100|150|120|// | b| 0| 0|100|// +------+---+---+---+与Python等效:
from pyspark.sql.functions import *def make_col(x): cnd = when(col("diff") == lit(x), col("value")).otherwise(lit(0)) return sum(cnd).alias(str(x))exprs = [make_col(x) for x in range(0, 30, 10)]dfWithDiff.groupBy(col("prodId")).agg(*exprs).show()## +------+---+---+---+## |prodId| 0| 10| 20|## +------+---+---+---+## | a|100|150|120|## | b| 0| 0|100|## +------+---+---+---+


