该
udf没有的列名是什么知识。因此,它会检查
if/
elif块中的每个条件,并且所有条件的计算结果均为
False。因此函数将返回
None。
您必须将您的代码重写
udf为要检查的列:
from pyspark.sql.functions import udffrom pyspark.sql.types import StringTypedef get_profile(foo, bar, baz): if foo == 1: return 'Foo' elif bar == 1: return 'Bar' elif baz == 1 : return 'Baz'spark_udf = udf(get_profile, StringType())spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))spark_df.show()#+---+---+---+-----------+#|Foo|Bar|Baz|get_profile|#+---+---+---+-----------+#| 0| 1| 0| Bar|#| 1| 0| 0| Foo|#| 1| 1| 1| Foo|#+---+---+---+-----------+如果您有很多列,并希望全部传递(按顺序):
spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))更一般而言,您可以解压缩任何有序的列列表:
cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))但是此特定操作不需要
udf。我会这样:
from pyspark.sql.functions import coalesce, when, col, litspark_df.withColumn( "get_profile", coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])).show()#+---+---+---+-----------+#|Foo|Bar|Baz|get_profile|#+---+---+---+-----------+#| 0| 1| 0| Bar|#| 1| 0| 0| Foo|#| 1| 1| 1| Foo|#+---+---+---+-----------+
之所以
pyspark.sql.functions.when()可行
null,是因为如果条件求值
False且未
otherwise指定任何值,则默认情况下将返回。然后列表理解
pyspark.sql.functions.coalesce将返回第一个非空列。
请注意,这仅等效于
udf列的顺序与
get_profile函数中评估的顺序相同的情况。更明确地说,您应该执行以下操作:
spark_df.withColumn( "get_profile", coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])).show()



