让我们从虚拟数据开始:
from pyspark.sql import Rowrow = Row("v", "x", "y", "z")df = sc.parallelize([ row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0), row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN"))]).toDF()## +----+----+---+---+## | v| x| y| z|## +----+----+---+---+## | 0.0| 1| 2|3.0|## |null| 3| 4|5.0|## |null|null| 6|7.0|## | NaN| 8| 9|NaN|## +----+----+---+---+您只需要一个简单的聚合:
from pyspark.sql.functions import col, count, isnan, lit, sumdef count_not_null(c, nan_as_null=False): """Use conversion between boolean and integer - False -> 0 - True -> 1 """ pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True)) return sum(pred.cast("integer")).alias(c)df.agg(*[count_not_null(c) for c in df.columns]).show()## +---+---+---+---+## | v| x| y| z|## +---+---+---+---+## | 2| 3| 4| 4|## +---+---+---+---+或者,如果你想享受
NaN一个
NULL:
df.agg(*[count_not_null(c, True) for c in df.columns]).show()## +---+---+---+---+## | v| x| y| z|## +---+---+---+---+## | 1| 3| 4| 3|## +---+---+---+---
您还可以利用SQL
NULL语义来实现相同的结果,而无需创建自定义函数:
df.agg(*[ count(c).alias(c) # vertical (column-wise) operations in SQL ignore NULLs for c in df.columns]).show()## +---+---+---+## | x| y| z|## +---+---+---+## | 1| 2| 3|## +---+---+---+
但这不适用于
NaNs。
如果您喜欢分数:
exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns]df.agg(*exprs).show()## +------------------+------------------+---+## | x| y| z|## +------------------+------------------+---+## |0.3333333333333333|0.6666666666666666|1.0|## +------------------+------------------+---+要么
# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issuedf.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show()## +------------------+------------------+---+## | x| y| z|## +------------------+------------------+---+## |0.3333333333333333|0.6666666666666666|1.0|## +------------------+------------------+---+相当于Scala:
import org.apache.spark.sql.Columnimport org.apache.spark.sql.functions.{col, isnan, sum}type JDouble = java.lang.Doubleval df = Seq[(JDouble, JDouble, JDouble, JDouble)]( (0.0, 1, 2, 3.0), (null, 3, 4, 5.0), (null, null, 6, 7.0), (java.lang.Double.NaN, 8, 9, java.lang.Double.NaN)).toDF()def count_not_null(c: Column, nanAsNull: Boolean = false) = { val pred = c.isNotNull and (if (nanAsNull) not(isnan(c)) else lit(true)) sum(pred.cast("integer"))}df.select(df.columns map (c => count_not_null(col(c)).alias(c)): _*).show// +---+---+---+---+ // | _1| _2| _3| _4|// +---+---+---+---+// | 2| 3| 4| 4|// +---+---+---+---+ df.select(df.columns map (c => count_not_null(col(c), true).alias(c)): _*).show // +---+---+---+---+ // | _1| _2| _3| _4| // +---+---+---+---+ // | 1| 3| 4| 3| // +---+---+---+---+


