栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark自定义UDF

spark自定义UDF

自定义udf都是大同小异

udf返回Array
  1. udf可以直接返回null只需返回值为null

  2. 不可变的 val s1 = Set("a","b","c") s1 ++: "a" 反回结果类型为Set; 如果需要则要用s1 ++: Set("a")

  3. udf 返回Array 需要 XXX.toSeq()

   spark.udf.register("COMBINE_KEYWORD", (goodsName: String, keywords: String) => {
      if (!(goodsName == null && keywords == null)) {
        if (goodsName == null && keywords != null) {
          val keywordsFilter = keywords.split("[\\||丨|、|;]").map(_.trim.toLowerCase).filter(_.length > 1)

          if (keywordsFilter.size > 0) keywordsFilter.toSet.toSeq else null
        } else if (goodsName != null && keywords == null) {
          Set(goodsName).toSeq
        } else {
          val keywordList = keywords.split("[\\||丨|、|;]").map(_.trim.toLowerCase).filter(_.length > 1).toSet
          val goodsStandName = goodsName.trim.toLowerCase

          if (keywordList.contains(goodsStandName)) {
            keywordList.toSeq
          } else {
            (keywordList ++: Set(goodsStandName)).toSeq
          }
        }
      } else {
        null
      }
    })

即使用object静态方式传udf函数,还是会提示UserTagsParams未序列化.
case class UserTagsParams(
                           weightBrand: Double,
                           weightCategory: Double,
                           weightConsumeOnLine: Double,
                           weightConsumeOffLine: Double,
                           weigthTimeDecay: Double,
                           weigthPv: Double,
                           weigthUv: Double,
                           weightAddCart: Double,
                           weightRemoveCart: Double,
                           limitDate: String,
                           memberGroup: List[String],
                           numPartitions: Int
                         )

object UserTagsParams {
  private var mLimitDayNum: Int = 0

  def setLimitDayNum(limitDayNum: Int) = this.mLimitDayNum = limitDayNum

  def limitDayNum: Int = this.mLimitDayNum
}

spark.udf.register("WEIBULL_DECAY", (t: Int) => {
  //      UserTagsParams.limitDayNum
  val totalNum = UserTagsParams.limitDayNum
  //      val totalNum = 30
  val initScore = 0.8
  val maxScore = 0.2

  val alpha = math.log(initScore / maxScore) / totalNum
  val l = -math.log(initScore) / alpha
  val decayScore = math.exp(-alpha * (t + l))

  decayScore
})


即使用object静态方式传udf函数,还是会提示UserTagsParams未序列化.
就算UserTagsParams extends Serializable 还是会有这样的提示

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:839)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:839)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
  ...
  ... 56 elided
Caused by: java.io.NotSerializableException: UserTagsParams$
Serialization stack:
	- object not serializable (class: UserTagsParams$, value: UserTagsParams$@364dfe1e)
	- field (class: $iw, name: UserTagsParams$module, type: class UserTagsParams$)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/316435.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号