您可以在不
udf使用的情况下执行此操作
Window。
考虑以下示例:
import pyspark.sql.functions as fdata = [ ('a', 5), ('a', 8), ('a', 7), ('b', 1), ('b', 3)]df = sqlCtx.createDataframe(data, ["A", "B"])df.show()#+---+---+#| A| B|#+---+---+#| a| 5|#| a| 8|#| a| 7|#| b| 1|#| b| 3|#+---+---+创建一个
Window按列划分的分区,
A然后使用该分区来计算每个组的最大值。然后滤除行,以使列中的值
B等于最大值。
from pyspark.sql import Windoww = Window.partitionBy('A')df.withColumn('maxB', f.max('B').over(w)) .where(f.col('B') == f.col('maxB')) .drop('maxB') .show()#+---+---+#| A| B|#+---+---+#| a| 8|#| b| 3|#+---+---+或等效地使用
pyspark-sql:
df.registerTempTable('table')q = "SELECT A, B FROM (SELECt *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERe B = maxB"sqlCtx.sql(q).show()#+---+---+#| A| B|#+---+---+#| b| 3|#| a| 8|#+---+---+


