栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark之SparkSQL

Spark之SparkSQL

什么是SparkSQL

        SparkSQL是Spark用于结构化数据处理的模块

SparkSQL的原理

SparkSQL提供了两个编程抽象,Dataframe和DataSet

Dataframe

        1)Dataframe是一种类似RDD的分布式数据集,类似于传统数据库中的二维表格。

        2)Dataframe与RDD的主要区别在于,Dataframe带有schema元信息,即Dataframe所表示的二维表数据集的每一列都带有名称和类型。

        注意: Spark SQL性能上比RDD要高。因为Spark SQL了解数据内部结构,从而对藏于Dataframe背后的数据源以及作用于Dataframe之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在Stage层面进行简单、通用的流水线优化。

DataSet

        1.DataSet是分布式数据集。

       2. DataSet是强类型的。比如可以有DataSet[Car],DataSet[User]。具有类型安全检查

        3.Dataframe是DataSet的特例,type Dataframe = DataSet[Row] ,Row是一个类型,跟Car、User这些的类型一样,所有的表结构信息都用Row来表示。

Rdd、Dataframe、DataSet的关系

(1)RDD、Dataframe、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算

(3)三者有许多共同的函数,如filter,排序等

(4)三者都会根据Spark的内存情况自动缓存运算

(5)三者都有分区的概念

三者转换图:

 //注意:在做转换之前,一定要导入隐式转换
    import spark.implicits._
    //RDD=>DF
    //普通RDD转换DF,需要手动补充列名
    val df01: Dataframe = dataRDD.toDF("name","age")
    //样例类RDD=>DF,自动会把样例类的属性名,作为列名
    val df: Dataframe = caseRDD.toDF()
    df01.show()
    df.show()

    //DF => RDD
    //DF转RDD,直接.rdd即可,但是需要注意的是DF不会保留原始数据的类型,类型统统为ROW
    val rdd01: RDD[Row] = df.rdd
    val res: RDD[User] = rdd01.map(
      row => {
        User(row.getString(0), row.getLong(1))
      }
    )
    //res.collect().foreach(println)
    //RDD=>DS
    val ds01: Dataset[(String, Long)] = dataRDD.toDS()
    val ds02: Dataset[User] = caseRDD.toDS()
    ds01.show()
    ds02.show()
    //DS => RDD
    val ds2rdd: RDD[User] = ds02.rdd
    val ds2rdd02: RDD[(String, Long)] = ds01.rdd

    ds2rdd.collect().foreach(println)

    val df2df: Dataframe = df.toDF("name2","age2")
    df2df.show()

    val df2ds: Dataset[User] = df2df.as[User]
    df2ds.show()
    //DS=>DF
    val ds2df: Dataframe = df2ds.toDF("name","age")
    ds2df.show()

自定义函数

UDF 一进一出

    val df: Dataframe = spark.read.json("E:\bigdata_study\sparkSQL\input\user.json")
    //创建Dataframe临时视图
    df.createTempView("user")

    spark.udf.register("addname",(name:String)=>{"Name:" + name})
    spark.udf.register("double",(age:Long)=>{"double:" + age * 2})
    spark.sql("select addName(name),double(age) from user").show

UDAF 输入多行,返回一行

  def main(args: Array[String]): Unit = {
    // 1 创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setAppName("SparkSQLTest").setMaster("local[*]")

    // 2 创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    val df: Dataframe = spark.read.json("E:\bigdata_study\sparkSQL\input\user.json")

    df.createTempView("user")

    spark.udf.register("myAvg",functions.udaf(new MyAvgUADF))

    spark.sql("select myAvg(age) from user").show()

    // 5 释放资源
    spark.stop()
  }
}
case class Buff(var sum:Long, var count:Double)

class MyAvgUADF extends Aggregator[Long,Buff,Double]{
  override def zero: Buff = Buff(0L,0L)
  //buff在单个分区内的聚合方法
  override def reduce(buff: Buff, age: Long): Buff = {
    buff.sum += age
    buff.count += 1
    buff
  }
  //多个buff在分区间的合并方法
  override def merge(b1: Buff, b2: Buff): Buff = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }

  override def finish(reduction: Buff): Double = {
    reduction.sum.toDouble/reduction.count
  }
  //序列化方法
  override def bufferEncoder: Encoder[Buff] = Encoders.product

  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

加载数据

//特定加载
val df01: Dataframe = spark.read.json("E:\bigdata_study\sparkSQL\input\user.json")
val df02: Dataframe = spark.read.csv("E:\bigdata_study\sparkSQL\input\user.txt")

//通用加载
val df03: Dataframe = spark.read.load("E:\bigdata_study\sparkSQL\input\user.json")

写出数据

    //特定
df.write.json("E:\bigdata_study\sparkSQL\input\out01")
df.write.csv("E:\bigdata_study\sparkSQL\input\out02")
    
//通用
df.write.mode(SaveMode.Ignore).save("E:\bigdata_study\sparkSQL\input\out02")

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742703.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号