栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

sparksql转换和自定义函数操作

sparksql转换和自定义函数操作

sparksql的操作
  • sparksql中RDD、Dataframe、DateSet的创建于相互转换
  • 自定义UDAF函数
    • 弱类型:继承UserDefinedAggregateFunction
    • 强类型:继承Aggregator(org.apache.spark.sql.expressions.Aggregator)

sparksql中RDD、Dataframe、DateSet的创建于相互转换
package spark_sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}

case class Emp(name: String, age: Long)

object spark_sql_json extends App {

  val ss: SparkSession = SparkSession.builder().master("local[*]").appName("sql").getOrCreate()

  import ss.implicits._

  val df: Dataframe = ss.read.json("datas/a.json").toDF()
  println("df")
  df.show()
  val ds: Dataset[Emp] = df.as[Emp]
  println("ds")
  ds.show()
  val df_rdd: RDD[Row] = df.rdd
  val rdd: RDD[Emp] = ds.rdd
  println("ds.rdd")
  rdd.collect().foreach(println)
  println("rdd.toDS")
  rdd.toDS()
  ss.stop()
}
自定义UDAF函数 弱类型:继承UserDefinedAggregateFunction
package sparkSql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}

case class user(name: String, age: Long)

object sparkSqlUdf {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")

    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val df: Dataframe = spark.read.json("datas/sql")

    spark.udf.register("udaf", new myUDAF())

    df.createOrReplaceTempView("user")

    spark.sql("select name , udaf(age)+1 as newAge from user group by name").show()

    spark.stop()
  }
}

class myUDAF extends UserDefinedAggregateFunction {
  // 输入数据结构
  override def inputSchema: StructType = {
    StructType(Array(StructField("age", DoubleType))
    )
  }

  // 缓冲区数据结构:这里是求平均数;缓冲区的数据应该是 相加的数量 和 个数
  override def bufferSchema: StructType = {
    StructType(Array(StructField("total", DoubleType), StructField("count", DoubleType)))
  }

  // 计算结果数据类型
  override def dataType: DataType = DoubleType

  override def deterministic: Boolean = true

  // 缓冲区大数据初始化

  
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, 0D)
    buffer.update(1, 0D)
  }

  // 缓冲区数据计算逻辑
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer.update(0, buffer.getDouble(0) + input.getDouble(0))
    buffer.update(1, buffer.getDouble(1) + 1)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
    buffer1.update(1, buffer1.getDouble(1) + buffer2.getDouble(1))

  }


  override def evaluate(buffer: Row): Any = {
    buffer.getDouble(0) / buffer.getDouble(1)
  }
}
强类型:继承Aggregator(org.apache.spark.sql.expressions.Aggregator)
package sparkSql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.{Dataframe, Encoder, Encoders, Row, SparkSession, functions}

object sparkSqlUdf_new {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val df: Dataframe = spark.read.json("datas/sql")

    //    @deprecated("Aggregator[IN, BUF, OUT] should now be registered as a UDF" +
    //      " via the functions.udaf(agg) method.", "3.0.0")
    spark.udf.register("udaf", functions.udaf(new UDAF()))
    df.createOrReplaceTempView("user")
    spark.sql("select name , udaf(age)+1 as newAge from user group by name").show()
    spark.stop()
  }
}

// 使用样例类强类型来定义缓存类型
case class Buff(var sum: Long, var cnt: Long)

class UDAF extends Aggregator[Long, Buff, Long] {
  override def zero: Buff = {
    Buff(0L, 0L)
  }
  override def reduce(buff: Buff, a: Long): Buff = {
    buff.cnt = buff.cnt + 1L
    buff.sum = buff.sum + a
    buff
  }
  override def merge(buff1: Buff, buff2: Buff): Buff = {
    buff1.sum = buff1.sum + buff2.sum
    buff1.cnt = buff1.cnt + buff2.cnt
    buff1
  }
  override def finish(reduction: Buff): Long =
    reduction.sum / reduction.cnt
  // 编码操作
  override def bufferEncoder: Encoder[Buff] = Encoders.product
  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/335337.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号