栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkSQL-----各种方式加载DataFrame

SparkSQL-----各种方式加载DataFrame

1.SparkSQL介绍
SaprkSQL完全脱离了Hive的限制,能够在Scala中写SQL语句。支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用。

2. Spark on Hive和Hive on Spark
Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行。
Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。

3.Dataframe
Dataframe也是一个分布式数据容器。与RDD类似,然而Dataframe更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息。

4.使用SparkSQL需要引入的依赖


      org.apache.spark
      spark-sql_2.11
      2.3.1
    

5.读取json格式的文件创建Dataframe
(1)方式1:read.json(文件路径)
show()方法不传参默认输出前20个数据,show(100)就是前100条数据
printschema()输出的是属性的信息
常用的那些配置语句可以省略成一句
(2)方式2:read.format(“json”).load(文件路径)

package zjc.bigdata

import org.apache.spark.sql.{Dataframe, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object sparkSQL01 {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().appName("sparkSQL01").master("local").getOrCreate()
    //val frame = sqlContext.read.json("D:\BigData\spark\filterWC\src\main\data\datajson")
    val frame: Dataframe = session.read.format("json").load("D:\BigData\spark\filterWC\src\main\data\datajson")
    frame.show()    //显示前20行数据
    frame.printSchema()
  }
}

运行结果:并且元数据为name,age,结果是age,name,原因:列会按照Ascii码排序

(3)SparkSQL根据条件进行增删改查
即SparkSQL的Dataframe API操作

1//select name ,age from table
	frame.select("name","age").show(100)
	
2.	//select name,age + 10 as addage from table
	frame.select(frame.col("name"),frame.col("age").plus(10).as("addage")).show(100)
	
3.	//select name,age from table where age >= 19
	frame.select("name","age").where(frame.col("age").>=(19)).show(100)
	或
	frame.filter("age>=19").show(100)
	
4.	//select name ,age from table order by name asc ,age desc
	frame.sort($"name".asc,frame.col("age").desc).show(100)
	
5.	//select name ,age from table where age is not null
	frame.filter("age is not null").show()
	
6.	
	frame.createTempView("mytable")
	frame.createOrReplaceTempView("mytable")
	frame.createGlobalTempView("mytable")
	frame.createOrReplaceGlobalTempView("mytable")
	
7.	
	val rdd: RDD[Row] = frame.rdd
	rdd.foreach(row=>{
	  val name = row.getAs[String]("name")
	  val age = row.getAs[Long]("age")
	  println(s"name is $name ,age is $age")
})

也可以直接进行sql查询
frame.sql(sql语句)

6.读取json格式的RDD加载Dataframe

object sparkSQL02 {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
    val list: List[String] = List[String](
      "{'name':zhangjiacheng,'age':18}",
      "{'name':lff,'age':18}",
      "{'name':lfe,'age':19}",
      "{'name':lffd,'age':20}",
      "{'name':lfg,'age':11}",
      "{'name':lfy,'age':12}"
    )
    import session.implicits._  //隐式转换,不改变原来的功能基础上增减功能
    val value: Dataset[String] = list.toDS()   //toDS()转换成DataSet,对应的toDF转成Dataframe
    value.show()
  }
}

7.非json格式的RDD创建Dataframe
(1)方式1:通过反射的方式将非json格式的RDD转换成Dataframe(不建议)
(2)方式2:动态创建Schema将非json格式的RDD转换成Dataframe

object sparkSQL03 {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().appName("sparkSQL03").master("local").getOrCreate()
    val peopleInfo: RDD[String] = session.sparkContext.textFile("D:\BigData\spark\filterWC\src\main\data\people")
    val peopleRDD: RDD[Row] = peopleInfo.map(info => {
      val id = info.split(",")(0).toInt
      val name = info.split(",")(1)
      val age = info.split(",")(2).toInt
      val score = info.split(",")(3).toInt
      Row(id, name, age, score)  //Row是个类型,表示一行数据
    })
    //组织structType结构化字段
    val structType=StructType(Array[StructField](
     StructField("id",IntegerType), 
     StructField("name",StringType), 
     StructField("age",IntegerType), 
     StructField("score",IntegerType),
    ))

    //创建Dataframe
    val frame: Dataframe = session.createDataframe(peopleRDD, structType)
    frame.createTempView("mytable")
    session.sql("select * from mytable")
  }
}

8.读取parquet文件创建Dataframe

parquet文件准备+读取

object sparkSQL04 {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().master("local").appName("sparkSQL04").getOrCreate()
    //读取json格式的文件,加载Dataframe
    val jsonframe: Dataframe = session.read.json("D:\BigData\spark\filterWC\src\main\data\people")
    //将json结果保存成parquet格式文件
    jsonframe.write.mode(SaveMode.Overwrite).format("parquet").save("D:\BigData\spark\filterWC\src\main\data\parquete01")
    //读取parquet格式的文件加载Dataframe
    val parquetframe: Dataframe = session.read.parquet("D:\BigData\spark\filterWC\src\main\data\parquete01")
    //将结果保存成parquet格式的数据
    parquetframe.write.parquet("D:\BigData\spark\filterWC\src\main\data\parquete02")
  }
}

9.通过mysql数据创建Dataframe

object sparkSQL05 {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().appName("sparkSQL05").master("local").getOrCreate()
    val properties: Properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","333")
    
    val frame: Dataframe = session.read.jdbc("jdbc:myssql://localhost:3306/spark", "people", properties)
    frame.createTempView("people")

    
    //将数据库配置参数配置在map中
    val map = Map[String, String](
      "user" -> "root",
      "pasword" -> "333",
      "url" -> "jdbc:mysql://localhost:3306/spark",
      "driver" -> "com.jdbc.mysql.Driver",
      "dbtable" -> "people"
    )
    val df2: Dataframe = session.read.format("jdbc").options(map).load()
    df2.show()

    
    val result: Dataframe = session.sql("select * from people")
    result.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark","result",properties)
  }
}

10.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604558.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号