最干净的解决方案是使用闭包传递其他参数:
def make_topic_word(topic_words): return udf(lambda c: label_maker_topic(c, topic_words))df = sc.parallelize([(["union"], )]).toDF(["tokens"])(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens"))) .show())这不需要更改
keyword_list或使用UDF包装功能。您也可以使用此方法传递任意对象。例如,这可以用于传递
sets有效查找的列表。
如果要使用当前的UDF并
topic_words直接传递,则必须先将其转换为列文字:
from pyspark.sql.functions import array, litks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()根据您的数据和要求,可以选择其他更有效的解决方案,这些解决方案不需要UDF(爆炸+聚合+折叠)或查找(散列+向量运算)。



