栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

pyspark-用户自定义函数udf

pyspark-用户自定义函数udf

在PySpark中,你用python语法建立一个函数,然后用PySpark SQL中的udf()方法在dataframe中使用,或将其注册成udf并在sql中使用。

例1 通过select()使用UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr

""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())

""" Converting function to UDF 
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z))

df.select(col("Seqno"),convertUDF(col("Name")).alias("Name")).show()
例2 通过withColumn()使用UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

def upperCase(str):
    return str.upper()
upperCaseUDF = udf(lambda z:upperCase(z),StringType())   

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show()

例3 注册UDF并在sql中使用
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr

""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") 
     .show(truncate=False)
例4 通过注释创建UDF
@udf(returnType=StringType()) # 或者写@udf
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name", upperCase(col("Name"))).show()
特殊处理

pyspark/spark并不能保证子句按从左到右或其他固定的顺序执行。pyspark会将执行根据查询优化与规划,所以and, or, where, having表述会有副作用

""" 
No guarantee Name is not null will execute first
If convertUDF(Name) like '%John%' execute first then 
you will get runtime error
"""
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " +  
         "where Name is not null and convertUDF(Name) like '%John%'") 
     .show(truncate=False)
# 如果某些记录的name值为空,就会有问题
spark.sql("select convertUDF(Name) from NAME_TABLE2") 
     .show(truncate=False)

tips:

  1. 最好在UDF函数内部检验null而不是在外部
  2. 如果在UDF内部不能检验null,那至少使用if或者case when来在使用UDF前检验null情况
总结

例1,2,4用的是api方法(df.select(”col1”)之类),例3用的是sql方法(spark.sql(”select * from tbl1”)之类)

api方法定义udf的格式为

# 方法一 函数定义后写udf函数
def func1(str):
	return str[0]
convertUDF = udf(lambda z: func1(z),StringType())
convertUDF = udf(lambda z: func1(z))

# 方法二 在函数定义的上一行写上@udf
@udf(returnType=StringType()) # 或者写@udf
def func1(str):
	return str[0]

sql方法定义udf的格式为

# 方法一:注册udf函数
def func1(str):
	return str[0]
spark.udf.register("func1",func1)
参考文献

PySpark UDF (User Defined Function)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/680839.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号