例如,您可以
join使用原始数据计算统计信息:
stats = (df.groupBy("dsc") .agg( func.stddev_pop("TranAmount").alias("sd"), func.avg("TranAmount").alias("avg")))df.join(broadcast(stats), ["dsc"])(df .join(func.broadcast(stats), ["dsc"]) .select("dsc", "TranAmount", (df.TranAmount - stats.avg) / stats.sd))或使用具有标准偏差公式的窗口函数:
from pyspark.sql.window import Windowimport sysdef z_score_w(col, w): avg_ = func.avg(col).over(w) avg_sq = func.avg(col * col).over(w) sd_ = func.sqrt(avg_sq - avg_ * avg_) return (col - avg_) / sd_w = Window().partitionBy("dsc").rowsBetween(-sys.maxsize, sys.maxsize)df.withColumn("zscore", z_score_w(df.TranAmount, w))


