您可以
lag如下使用窗口功能
from pyspark.sql.functions import lag, colfrom pyspark.sql.window import Windowdf = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])w = Window().partitionBy().orderBy(col("id"))df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()## +---+---+-------+## | id|num|new_col|## +---+---+-------|## | 2|3.0| 5.0|## | 3|7.0| 3.0|## | 4|9.0| 7.0|## +---+---+-------+但是有一些重要的问题:
- 如果您需要全局操作(不被其他一个或多个其他列分区),则效率极低。
- 您需要一种自然的方式来订购数据。
尽管第二个问题几乎从来都不是问题,但第一个问题可以成为破坏交易的方法。如果是这种情况,您应该简单地将其转换
Dataframe为RDD并
lag手动进行计算。



