栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark-SQL教程

Spark-SQL教程

目录
  • 创建SparkSession
  • 通过SparkSession创建SparkContext
  • 读取数据并生成Dataframe实例
  • 手动创建Dataframe
  • 创建DataSet
    • 读取文本文件并生成dataset实例
    • 自定义属性生成dataset
    • DataSet的使用 - 读取文本文件
    • DataSet的使用 - 读取MySQL
  • Spark_On_Hive
    • standalone模式
    • hive集群模式
      • 搭建hive-metastore服务的配置
      • hiveserver2服务的配置
      • hive 的命令行简单介绍
      • IDEA使用spark程序交互hive
    • Spark-Sql服务
      • SPARK整合HIVE
      • Spark-Sql
  • spark-sql函数
    • 自定义函数

创建SparkSession
val session = SparkSession
      .builder()
      .config(new SparkConf())
      .appName("test01") // 如果在conf中设置了,就不需要在此设置
      .master("local") // 如果在conf中设置了,就不需要在此设置
      // .enableHiveSupport()   //开启这个选项时  spark sql on  hive  才支持DDL,没开启,spark只有catalog
      .getOrCreate()
通过SparkSession创建SparkContext
val sc: SparkContext = session.sparkContext
sc.setLogLevel("ERROR")
读取数据并生成Dataframe实例
    // {"name":"zhangsan","age":20}...
    val spark = SparkSession.builder().config(new SparkConf()).master("local").appName("hello").getOrCreate()
    val frame = spark.read.json("file:///Users/jinxingguang/java_project/bigdata3/spark-demo/data/json")
    frame.show()
    println(frame.count())
    import spark.implicits._
    frame.filter($"age" >=20).show()
手动创建Dataframe
  • 方式一
person.txt
chauncy 18
lisa 22
yiyun 99

// 创建Dataframe
    // 数据+元数据  ==  Dataframe 类似表
    // 第一种方式 row类型的rdd + structType
    // 1. 数据 RDD[ROW]  一行一行的数据
    val rdd = sc.textFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/person.txt")
    val rddRow: RDD[Row] = rdd.map(_.split(" ")).map(arr => Row.apply(arr(0), arr(1).toInt))

    // 2. 元数据 : StructType
    // 列类型  字段定义
    val fields = Array[StructField](
      StructField.apply("name", DataTypes.StringType, nullable = true),
      StructField.apply("age", DataTypes.IntegerType, nullable = true)
    )
    val schema = StructType.apply(fields) // 表的定义
   
		// 创建Dataframe
    val dataframe = session.createDataframe(rddRow, schema)

    dataframe.show()
    dataframe.printSchema() // 打印表头
    dataframe.createTempView("person") // 通过session 向catalog中注册表名
    session.sql("select name from person").show()
  • 动态封装Dataframe
person.txt
chauncy 18 0
lisa 22 1
yiyun 99 1

// 创建Dataframe动态封装
val rdd = sc.textFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/person.txt")

// 每一列的类型约定
val userSchema = Array(
  "name string",
  "age int",
  "sex int"
)

def toDataType(col: (String, Int)) = {
  userSchema(col._2).split(" ")(1) match {
    case "string" => col._1.toString
    case "int" => col._1.toInt
  }
}

// 1 row rdd
// rdd.map(_.split(" ")).map(line => Row.apply(line(0),line(1).toInt)) // 写死了
val rddRow: RDD[Row] = rdd.map(_.split(" "))
.map(x => x.zipWithIndex) // [(chauncy,0), (18,1)]
.map(x => x.map(toDataType(_)))
.map(line => Row.fromSeq(line)) // row 表示了很多的列,每个列要标识出准确的类型

// 2 structType
// 函数,获取每一列的类型
def getDataType(v: String) = {
  v match {
    case "string" => DataTypes.StringType
    case "int" => DataTypes.IntegerType
  }
}
// 列的属性
val fields: Array[StructField] = userSchema.map(_.split(" ")).map(x => StructField.apply(x(0), getDataType(x(1))))
val schema: StructType = StructType.apply(fields)
// schema = schema01等价
val schema01: StructType = StructType.fromDDL("name string,age int,sex int")

val dataframe = session.createDataframe(rddRow, schema01)
dataframe.show()
dataframe.printSchema()
// 通过session向catalog注册
dataframe.createTempView("user")

session.sql("select * from user").show()
  • 方式二 – bean类型的rdd + javabean
// Bean类型实例
class Person extends Serializable {
    @BeanProperty
    var name: String = ""
    @BeanProperty
    var age: Int = 0
    @BeanProperty
    var sex: Int = 0
}


-------
//第二种方式: bean类型的rdd + javabean
//第二种方式: bean类型的rdd + javabean
val rdd = sc.textFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/person.txt")
val person = new Person // 放到外部需要 extends Serializable
// 1,mr,spark  pipeline  iter  一次内存飞过一条数据::-> 这一条记录完成读取/计算/序列化
// 2,分布式计算,计算逻辑由 Driver  序列化,发送给其他jvm的Executor中执行
val rddBean: RDD[Person] = rdd.map(_.split(" ")).map(arr => {
  //      val person = new Person
  person.setName(arr(0))
  person.setAge(arr(1).toInt)
  person.setSex(arr(2).toInt)
  person
})
val dataframe = session.createDataframe(rddBean, classOf[Person])
dataframe.show()
dataframe.printSchema()
// 通过session向catalog注册
dataframe.createTempView("user")

session.sql("select * from user").show()
 
创建DataSet 读取文本文件并生成dataset实例
  case class User(name:String,age:BigInt) extends Serializable
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().config(new SparkConf()).master("local").appName("hello").getOrCreate()
    import spark.implicits._
    // {"name":"zhangsan","age":20}...
    val data = spark.read.json("file:///Users/jinxingguang/java_project/bigdata3/spark-demo/data/json").as[User]
    data.show()
    data.filter($"age" >=20).show()
  }
自定义属性生成dataset
    // Spark 的DataSet 既可以按collection类似于rdd的方式操作,也可以按SQL的方式操作
val rddData: Dataset[String] = session.read.textFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/person.txt")
import  session.implicits._
val person: Dataset[(String, Int)] = rddData.map(
  line => {
    val strs = line.split(" ")
    (strs(0), strs(1).toInt)
  }
)

// 附加表的列描述
val cPerson = person.toDF("name", "age")
cPerson.show()
cPerson.printSchema()
DataSet的使用 - 读取文本文件
val conf = new SparkConf().setMaster("local").setAppName("sql hive")
val session = SparkSession
.builder()
.config(conf)
//      .enableHiveSupport()
.getOrCreate()
val sc = session.sparkContext
sc.setLogLevel("Error")

import  session.implicits._
val dataDF: Dataframe = List(
  "hello world",
  "hello world",
  "hello msb",
  "hello world",
  "hello world",
  "hello spark",
  "hello world",
  "hello spark"
).toDF("line") // 列名为line


dataDF.createTempView("ooxx") // 注册到catalog

val df: Dataframe = session.sql("select * from ooxx")
df.show()
df.printSchema()

// 计算词频 使用SQL的方式
//    session.sql(" select word,count(1) from  (select explode(split(line,' ')) word from ooxx) as tt group by word").show()

// 计算词频 使用api的方式 df 相当于 from table
val subTab = dataDF.selectExpr("explode(split(line,' ')) word")
val dataset: RelationalGroupedDataset = subTab.groupBy("word")
val res = dataset.count()


// 将结果保存到parquet文本
res.write.mode(SaveMode.Append).parquet("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/out/ooxx")

// 读取parquet文本
val frame: Dataframe = session.read.parquet("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/out/ooxx")
frame.show()
frame.printSchema()


DataSet的使用 - 读取MySQL
// val conf = new SparkConf().setMaster("local").setAppName("mysql")
val session = SparkSession
.builder()
.master("local")
.appName("mysql")
.config("spark.sql.shuffle.partitions", "1")  //默认会有100并行度的参数
//      .enableHiveSupport()
.getOrCreate()
val sc = session.sparkContext
// sc.setLogLevel("ERROR")
sc.setLogLevel("INFO")

val properties = new Properties()
properties.put("url","jdbc:mysql://192.168.7.17:3306/mysql_test")
properties.put("user","xxx")
properties.put("password","xxx")
properties.put("driver","com.mysql.jdbc.Driver")




val usersDF: Dataframe = session.read.jdbc(properties.get("url").toString,"student",properties)
val scoreDF: Dataframe = session.read.jdbc(properties.get("url").toString,"score",properties)

usersDF.createTempView("userstab")
scoreDF.createTempView("scoretab")

val resDF: Dataframe = session.sql("select  userstab.s_id,userstab.s_name, scoretab.s_score   from    userstab join scoretab on userstab.s_id = scoretab.s_id")
resDF.show()
resDF.printSchema()
// 默认并行度是100
// 21/10/13 07:47:05 INFO DAGScheduler: Submitting 100 missing tasks from ResultStage 11
Spark_On_Hive standalone模式

        
            org.scala-lang
            scala-library
            2.11.12
        
        
            org.apache.hadoop
            hadoop-client
            2.6.5
        
        
            org.apache.hadoop
            hadoop-hdfs
            2.6.5
        
        
            org.apache.spark
            spark-core_2.11
            2.3.4
            
            
                
                    org.apache.hadoop
                    hadoop-client
                

                
                    org.scala-lang
                    scala-library
                
            
        
        
            org.apache.spark
            spark-hive_2.11
            2.3.4
        
        
            org.apache.spark
            spark-sql_2.11
            2.3.4
        
        
            org.apache.spark
            spark-streaming_2.11
            2.3.4
        

        
            org.apache.spark
            spark-streaming-kafka-0-10_2.11
            2.3.4
        

        
            org.apache.spark
            spark-hive_2.11
            2.3.4
        

        
            mysql
            mysql-connector-java
            5.1.27
        
    
    
    
        
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        scala-compile-first
                        
                            compile
                        
                        
                            
                                **/*.scala
                            
                        
                    
                    
                        scala-test-compile
                        
                            testCompile
                        
                    
                
            
        
    
val ss: SparkSession = SparkSession
.builder()
.master("local")
.appName("standalone hive")
.config("spark.sql.shuffle.partitions", 1)
.config("spark.sql.warehouse.dir", "file:///Users/jinxingguang/java_project/bigdata-chauncy/spark/warehouse")
.enableHiveSupport()   //开启hive支持   ?   自己会启动hive的metastore
.getOrCreate()
val sc: SparkContext = ss.sparkContext
//    sc.setLogLevel("ERROR")
//    ss.sql("create table xxx(name string,age int)")
//    ss.sql("insert into xxx values ('zhangsan',18),('lisi',22)")
ss.sql("select * from xxx").show()
ss.catalog.listTables().show()

// 有数据库的概念
ss.sql("create database chauncy_db")
ss.sql("use chauncy_db")
ss.sql("create table meifute(name string,age int)")
ss.catalog.listTables().show()
hive集群模式 搭建hive-metastore服务的配置

    
    
        hive.metastore.warehouse.dir
        /user/hive_remote/warehouse
        
    
    
    
        javax.jdo.option.ConnectionURL
        jdbc:mysql://192.168.7.17:3306/hive_remote?createDatabaseIfNotExist=true
        数据库连接
    
    
    
        javax.jdo.option.ConnectionDriverName
        com.mysql.cj.jdbc.Driver
        
    
    
    
        javax.jdo.option.ConnectionUserName
        meifute
        
    
    
        javax.jdo.option.ConnectionPassword
        meifute
        
    



############################################
其他hive节点,可以是hive命令行,可以是hiveserver2服务
node02机器,node03机器,node04机器

    
    
        hive.metastore.uris
        thrift://node01:9083
        metastore地址
    

hiveserver2服务的配置

    
    
        hive.metastore.uris
        thrift://node01:9083
        metastore地址
    

hive 的命令行简单介绍
######## hive 的命令行
hive --service --help
启动metastore
hive --service metastore

在node02启动hive命令行
hive --service cli 等于 hive
show databases;
use default;
show tables;

在node03启动hiveserver2
hive --service hiveserver2 等价于 hiveserver2

在node04使用beeline连接hiveserver2
beeline -u jdbc:hive2://node03:10000/default -n god
IDEA使用spark程序交互hive
val ss: SparkSession = SparkSession
  .builder()
  .appName("cluster on hive")
  .master("local")
  .config("hive.metastore.uris", "thrift://node01:9083")
  .enableHiveSupport()
  .getOrCreate()
val sc: SparkContext = ss.sparkContext
sc.setLogLevel("ERROR")

ss.sql("create database IF NOT EXISTS spark_hive ")
ss.sql("use spark_hive")
ss.catalog.listTables().show() // 报错了,删除掉hive中的hbase表
// Class org.apache.hadoop.hive.hbase.HbaseSerDe not found


import ss.implicits._

val df01: Dataframe = List(
  "zhangsan",
  "lisi"
).toDF("name")
df01.createTempView("ooxx") // 通过session向catalog注册表

// SQL创建表,并插入数据
ss.sql("create table IF NOT EXISTS hddffs ( id int,age int)") //DDL
// 需要 core-site.xml 和 hdfs-site.xml 的hadoop配置
ss.sql("insert into hddffs values (4,3),(8,4),(9,3)") // DML  数据是通过spark自己和hdfs进行访问

ss.sql("show tables").show() // 临时表,没有保存到hive中
df01.write.saveAsTable("oxox") // 在hive中将数据保存成oxox表,不是临时向catalog注册的表
ss.sql("show tables").show() // 临时表,没有保存到hive中
Spark-Sql服务 SPARK整合HIVE
  • spark配置
只需要metastore的地址配置就可以了
cat > /opt/bigdata/spark-2.3.4/conf/hive-site.xml <<-EOF



    
        hive.metastore.uris
        thrift://node01:9083
        metastore地址
    

EOF
  • spark相关的命令启动服务
启动spark-shell
cd /opt/bigdata/spark-2.3.4/bin
./spark-shell --master yarn
scala> spark.sql("show tables").show


启动spark-sql
cd /opt/bigdata/spark-2.3.4/bin
./spark-sql --master yarn
查看网页 http://node03:8088/cluster 会出现SparkSQL
可以直接执行SQL,跟hive中共享,两边都可以操作
spark-sql> show tables;
Spark-Sql
# 对外暴露JDBC服务,接受SQL执行
cd /opt/bigdata/spark-2.3.4/sbin
./start-thriftserver.sh --master yarn
查看网页 http://node03:8088/cluster  **多了一个 Thrift JDBC/ODBC Server**


# 使用spark的beeline连接
/opt/bigdata/spark-2.3.4/bin/beeline -u jdbc:hive2://node01:10000/default -n god

打印 Connected to: Spark SQL (version 2.3.4)

/usr/lib/spark-current/bin/beeline -u jdbc:hive2://localhost:10000 -n hadoop
show tables;
spark-sql函数

官网文档 2.3.4版本

org/apache/spark/sql/functions.scala scala对应的源码

自定义函数
package com.chauncy.spark_dataframe_dataset

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}

object MySpark_Sql_functions {
  def main(args: Array[String]): Unit = {
    val ss = SparkSession
      .builder()
      .appName("functions")
      .master("local")
      .getOrCreate()
    ss.sparkContext.setLogLevel("ERROR")


    import ss.implicits._
    // 将List转成Dataframe
    val dataDF: Dataframe = List(
      ("A", 1, 67),
      ("D", 1, 87),
      ("B", 1, 54),
      ("D", 2, 24),
      ("C", 3, 64),
      ("R", 2, 54),
      ("E", 1, 74)
    ).toDF("name", "class", "score")

    dataDF.createTempView("users")

    // 分组,排序统计
    //    ss.sql("select name,sum(score) " +
    //      " from users " +
    //      "group by name" +
    //      " order by name").show()
    ss.sql("select * from users order by name asc,score desc").show()

    //udf 普通的自定义函数
    ss.udf.register("ooxx", (x: Int) => {
      x * 10
    })
    ss.sql("select *,ooxx(score) mut_10 from users ").show()

    // 自定义聚合函数
    //    class MyAggFun extends UserDefinedAggregateFunction
    ss.udf.register("myagg", new MyAvgFun)
    ss.sql("select name," +
      " myagg(score) " +
      " from users " +
      " group by name").show()
  }

  class MyAvgFun extends UserDefinedAggregateFunction {
    // 输入列的类型
    override def inputSchema: StructType = {
      // myagg(score)
      StructType.apply(Array(StructField.apply("score", IntegerType, false)))
    }

    override def bufferSchema: StructType = {
      StructType.apply(
        Array(
          StructField.apply("sum", IntegerType, false),
          StructField.apply("count", IntegerType, false)
        )
      )
    }

    override def dataType: DataType = DoubleType
    // 是否幂等
    override def deterministic: Boolean = true
    // 是否初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0) = 0
      buffer(1) = 0
    }
    // 来一条数据更新一次
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      // 组内,一条记录调用一次
      buffer(0) = buffer.getInt(0) + input.getInt(0) // sum
      buffer(1) = buffer.getInt(1) + 1
    }
    // 溢写怎么计算
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
      buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
    }
    // 最后的结果
    override def evaluate(buffer: Row): Double = {
      buffer.getInt(0) /buffer.getInt(1)
    }
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/613166.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号