栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021.12.13内置函数、自定义函数UDF、UDAF、UDTF

2021.12.13内置函数、自定义函数UDF、UDAF、UDTF

目录

内置函数InnerFunction

  自定义函数UDF、UDAF、UDTF


 

内置函数InnerFunction

package cn.kgc.function


import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}

object InnerFunction {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("innerdemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    val sc: SparkContext = spark.sparkContext

    //模拟用户访问日志信息
    val accessLog = Array(
      "2016-12-27,001",
      "2016-12-27,001",
      "2016-12-27,002",
      "2016-12-28,003",
      "2016-12-28,004",
      "2016-12-28,002",
      "2016-12-28,002",
      "2016-12-28,001"
    )

    val rdd: RDD[Row] = sc.parallelize(accessLog).map(x => {
      val strings: Array[String] = x.split(",")
      Row(strings(0), strings(1).toInt) //string Int
    })


    val schema: StructType = StructType(Array(
      StructField("day", StringType),
      StructField("userid", IntegerType)
    ))

    val frame: Dataframe = spark.createDataframe(rdd,schema)

    frame.printSchema()
    frame.show()

    import org.apache.spark.sql.functions._

    frame.groupBy("day").agg(count("userid").as("pv")).select("day","pv").show()
    frame.groupBy("day").agg(countDistinct("userid").as("uv")).select("day","uv").show()













  }

}
package cn.kgc.function

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, SparkSession}


case class Student(id:Integer,name:String,gender:String,age:Integer)

object InnerFunctionDemo2 {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("innerdemo2").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    val sc: SparkContext = spark.sparkContext

    import spark.implicits._

    val students: Seq[Student] = Seq(
      Student(1, "zhangsan", "F", 22),
      Student(2, "lisi", "M", 38),
      Student(3, "wangwu", "M", 13),
      Student(4, "zhaoliu", "F", 17),
      Student(5, "songba", "M", 32),
      Student(6, "sunjiu", "M", 16),
      Student(7, "qianshiyi", "F", 17),
      Student(8, "yinshier", "F", 15),
      Student(9, "fangshisan", "M", 12),
      Student(10, "yeshisan", "F", 11),
      Student(11, "ruishiyi", "F", 26),
      Student(12, "chenshier", "M", 28)
    )

      //两种方式 seq to df    1.toDF 2.spark.createDataframe
//    val frame: Dataframe = students.toDF()
//    frame.printSchema()
//    frame.show()

    val frame: Dataframe = spark.createDataframe(students)
//    frame.printSchema()

    import org.apache.spark.sql.functions._

//    //根据性别分组,求平均年龄
//    //select gender,avg("age") from student group by gender
//    frame.groupBy("gender").agg(avg("age").as("avgage")).show()
//
//    //select avg("age") as avgage from student
//    frame.agg(avg("age")).show()
//
//
//    //select gender,avg(age),max(age),min(age) from student group by gender
//    frame.groupBy("gender")
//      .agg(avg("age"),max("age"),min("age")).show()
//    frame.groupBy("gender")
//      .agg("age"->"avg","age"->"max","age"->"min").show()
//
//
//    //select count(*) from students group by gender,age
//    frame.groupBy("gender","age").count().show()
//
//    //select age from student order by age desc /asc
//    frame.sort(desc("age")).show()
//    frame.sort($"age".desc).show()
//    frame.sort('age.desc).show()

    //select avg(age) as avgage from students group by gender order by avgage desc
    frame.groupBy("gender").agg(avg("age")).orderBy(avg("age").desc).show()










  }

}

  自定义函数UDF、UDAF、UDTF

package cn.kgc.function

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, SparkSession}


case class Hobbies(name:String,hobbies: String)  //样例类

//UDF 一进一出
object UDFDemo {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("innerdemo2").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    val sc: SparkContext = spark.sparkContext

    import spark.implicits._

    val hobbyDF: Dataframe = sc.textFile("in/hobbies.txt").map(x => x.split(" "))
      .map(x => Hobbies(x(0), x(1))).toDF()
    hobbyDF.printSchema()
    hobbyDF.show(truncate = false)

    


    hobbyDF.createOrReplaceTempView("hobby")
    //只能在Sql中使用hobby_num函数
    spark.udf.register("hobby_num",(x:String)=>{x.split(",").size})

    spark.sql("select name,hobbies,hobby_num(hobbies) as hobbynum from hobby")
      .show(truncate = false)


    spark.sql("select name from hobby where hobbies like 'travel%'").show(false)
    spark.sql("select if(hobbies like 'travel%',1,0) from hobby").show(false)


    println("-------------------------------------")

    import org.apache.spark.sql.functions
    //只能在DF中使用hobby_num 函数
    val hobby_num: UserDefinedFunction = functions.udf((hobboies:String)=>{hobboies.split(",").size})
    val newHobbyDF: Dataframe = hobbyDF.withColumn("hobbynum",hobby_num($"hobbies"))
    newHobbyDF.printSchema()
    newHobbyDF.show(truncate = false)

    println("-----------------------------------------")


    spark.udf.register("hobby_equ",(x:String)=>{
      val strings: Array[String] = x.split(",")
      if (strings(0) equals("travel")) {
        1
      }
      else
        0
    })

    spark.sql("select name,hobbies,hobby_equ(hobbies) as equ from hobby")
      .show(false)







  }

}
package cn.kgc.function

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
//UDAF多进一出
object UDAFDemo {
  def main(args: Array[String]): Unit = {

    


    val conf: SparkConf = new SparkConf().setAppName("innerdemo3").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    val sc: SparkContext = spark.sparkContext

    import spark.implicits._

    val students: Seq[Student] = Seq(
      Student(1, "zhangsan", "F", 22),
      Student(2, "lisi", "M", 38),
      Student(3, "wangwu", "M", 13),
      Student(4, "zhaoliu", "F", 17),
      Student(5, "songba", "M", 32),
      Student(6, "sunjiu", "M", 16),
      Student(7, "qianshiyi", "F", 17),
      Student(8, "yinshier", "F", 15),
      Student(9, "fangshisan", "M", 12),
      Student(10, "yeshisan", "F", 11),
      Student(11, "ruishiyi", "F", 26),
      Student(12, "chenshier", "M", 28)
    )

    //两种方式 seq to df    1.toDF 2.spark.createDataframe
    //    val frame: Dataframe = students.toDF()
    //    frame.printSchema()
    //    frame.show()

    val frame: Dataframe = spark.createDataframe(students)
    //    frame.printSchema()

    import org.apache.spark.sql.functions._

    spark.udf.register("myAvg",new MyAgeAvgFunction)
    frame.createOrReplaceTempView("students")

    val resultDF: Dataframe = spark.sql("select myAvg(age) from students")
    resultDF.printSchema()
    resultDF.show()

  }

}

//自定义聚合函数 UDAF extends UserDefinedAggregateFunction
class MyAgeAvgFunction extends UserDefinedAggregateFunction {
  //聚合函数的输入数据的数据结构
  override def inputSchema: StructType = {
    new StructType().add("age",LongType)
//    StructType(StructField("age",LongType) :: Nil)
  }

  //在缓存区内的数据结构 ageSum(1000) ageNum(200)
  //sum 用来记录所有年龄值相加的总和 43+52+61+78 =234 => sum
  //count 用来记录相加个数的总和 1+1+1+1 =4 => count
  override def bufferSchema: StructType = {
    new StructType().add("sum",LongType).add("count",LongType)
//    StructType(StructField("num",LongType) :: StructField("count",LongType) :: Nil)
//    Student(1,"","",1) ::Student(1,"","",1) ::Nil

  }

  //定义当前函数返回值类型 sum/count 得到Double类型
  override def dataType: DataType = DoubleType

  //聚合函数幂等
  override def deterministic: Boolean = true

  //初始值
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0)=0L //记录 传入所有用户年龄相加的总和
    buffer(1)=0L //记录 传入所有用户年龄的个数

  }

  //传入一条新数据后需要进行处理
  //将Row(63)对象中的值取出与buffer(0) 数据相加
  //buffer(1)数据个数加一
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getLong(0)+input.getLong(0)
    buffer(1)=buffer.getLong(1)+1
  }

  //合并各分区内的数据
  //例如 p1(321,6) p2(128,2) p3(219,3)
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //计算年龄相加总和
    buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
    //总人数
    buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
  }

    //计算最终结果
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0)/buffer.getLong(1).toDouble
  }
}
package cn.kgc.function

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, StructObjectInspector}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataframe, SparkSession}

object UDTFDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("udtfdemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder()
      .config(conf)
      .config("hive.metastore.uris","thrift://192.168.111.131:9083")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val sc: SparkContext = spark.sparkContext
    val rdd: RDD[String] = sc.textFile("in/UDTF.txt")

    val rdd2: RDD[(String, String, String)] = rdd.map(x => x.split("//"))
      .filter(x => x(1).equals("ls"))
      .map(x => (x(0), x(1), x(2)))
    val frame: Dataframe = rdd2.toDF("id","name","class")
    frame.printSchema()
    frame.show(truncate = false)

    frame.createOrReplaceTempView("udtftable")

    spark.sql("create temporary function Myudtf as 'cn.kgc.function.MyUDTF'")

    spark.sql("select Myudtf(class) from udtftable").show(truncate = false)






  }

}

class MyUDTF extends GenericUDTF {

  override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
    val fieldName=new java.util.ArrayList[String]()
    val fieldOIS=new java.util.ArrayList[ObjectInspector]

    //定义输出字段的类型
    fieldName.add("type")
    fieldOIS.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
    ObjectInspectorFactory.getStandardStructObjectInspector(fieldName,fieldOIS)
  }

  
  override def process(objects: Array[AnyRef]): Unit = {
    val strings: Array[String] = objects(0).toString.split(" ")
    for(str <- strings){
      val temp: Array[String] = new Array[String](1)
      temp(0)=str
      forward(temp)
    }
  }

  override def close(): Unit = {}


}

 

 

 

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/662809.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号