- 创建SparkSession
- 通过SparkSession创建SparkContext
- 读取数据并生成Dataframe实例
- 手动创建Dataframe
- 创建DataSet
- 读取文本文件并生成dataset实例
- 自定义属性生成dataset
- DataSet的使用 - 读取文本文件
- DataSet的使用 - 读取MySQL
- Spark_On_Hive
- standalone模式
- hive集群模式
- 搭建hive-metastore服务的配置
- hiveserver2服务的配置
- hive 的命令行简单介绍
- IDEA使用spark程序交互hive
- Spark-Sql服务
- SPARK整合HIVE
- Spark-Sql
- spark-sql函数
- 自定义函数
val session = SparkSession
.builder()
.config(new SparkConf())
.appName("test01") // 如果在conf中设置了,就不需要在此设置
.master("local") // 如果在conf中设置了,就不需要在此设置
// .enableHiveSupport() //开启这个选项时 spark sql on hive 才支持DDL,没开启,spark只有catalog
.getOrCreate()
通过SparkSession创建SparkContext
val sc: SparkContext = session.sparkContext
sc.setLogLevel("ERROR")
读取数据并生成Dataframe实例
// {"name":"zhangsan","age":20}...
val spark = SparkSession.builder().config(new SparkConf()).master("local").appName("hello").getOrCreate()
val frame = spark.read.json("file:///Users/jinxingguang/java_project/bigdata3/spark-demo/data/json")
frame.show()
println(frame.count())
import spark.implicits._
frame.filter($"age" >=20).show()
手动创建Dataframe
- 方式一
person.txt
chauncy 18
lisa 22
yiyun 99
// 创建Dataframe
// 数据+元数据 == Dataframe 类似表
// 第一种方式 row类型的rdd + structType
// 1. 数据 RDD[ROW] 一行一行的数据
val rdd = sc.textFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/person.txt")
val rddRow: RDD[Row] = rdd.map(_.split(" ")).map(arr => Row.apply(arr(0), arr(1).toInt))
// 2. 元数据 : StructType
// 列类型 字段定义
val fields = Array[StructField](
StructField.apply("name", DataTypes.StringType, nullable = true),
StructField.apply("age", DataTypes.IntegerType, nullable = true)
)
val schema = StructType.apply(fields) // 表的定义
// 创建Dataframe
val dataframe = session.createDataframe(rddRow, schema)
dataframe.show()
dataframe.printSchema() // 打印表头
dataframe.createTempView("person") // 通过session 向catalog中注册表名
session.sql("select name from person").show()
- 动态封装Dataframe
person.txt
chauncy 18 0
lisa 22 1
yiyun 99 1
// 创建Dataframe动态封装
val rdd = sc.textFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/person.txt")
// 每一列的类型约定
val userSchema = Array(
"name string",
"age int",
"sex int"
)
def toDataType(col: (String, Int)) = {
userSchema(col._2).split(" ")(1) match {
case "string" => col._1.toString
case "int" => col._1.toInt
}
}
// 1 row rdd
// rdd.map(_.split(" ")).map(line => Row.apply(line(0),line(1).toInt)) // 写死了
val rddRow: RDD[Row] = rdd.map(_.split(" "))
.map(x => x.zipWithIndex) // [(chauncy,0), (18,1)]
.map(x => x.map(toDataType(_)))
.map(line => Row.fromSeq(line)) // row 表示了很多的列,每个列要标识出准确的类型
// 2 structType
// 函数,获取每一列的类型
def getDataType(v: String) = {
v match {
case "string" => DataTypes.StringType
case "int" => DataTypes.IntegerType
}
}
// 列的属性
val fields: Array[StructField] = userSchema.map(_.split(" ")).map(x => StructField.apply(x(0), getDataType(x(1))))
val schema: StructType = StructType.apply(fields)
// schema = schema01等价
val schema01: StructType = StructType.fromDDL("name string,age int,sex int")
val dataframe = session.createDataframe(rddRow, schema01)
dataframe.show()
dataframe.printSchema()
// 通过session向catalog注册
dataframe.createTempView("user")
session.sql("select * from user").show()
- 方式二 – bean类型的rdd + javabean
// Bean类型实例
class Person extends Serializable {
@BeanProperty
var name: String = ""
@BeanProperty
var age: Int = 0
@BeanProperty
var sex: Int = 0
}
-------
//第二种方式: bean类型的rdd + javabean
//第二种方式: bean类型的rdd + javabean
val rdd = sc.textFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/person.txt")
val person = new Person // 放到外部需要 extends Serializable
// 1,mr,spark pipeline iter 一次内存飞过一条数据::-> 这一条记录完成读取/计算/序列化
// 2,分布式计算,计算逻辑由 Driver 序列化,发送给其他jvm的Executor中执行
val rddBean: RDD[Person] = rdd.map(_.split(" ")).map(arr => {
// val person = new Person
person.setName(arr(0))
person.setAge(arr(1).toInt)
person.setSex(arr(2).toInt)
person
})
val dataframe = session.createDataframe(rddBean, classOf[Person])
dataframe.show()
dataframe.printSchema()
// 通过session向catalog注册
dataframe.createTempView("user")
session.sql("select * from user").show()
创建DataSet
读取文本文件并生成dataset实例
case class User(name:String,age:BigInt) extends Serializable
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(new SparkConf()).master("local").appName("hello").getOrCreate()
import spark.implicits._
// {"name":"zhangsan","age":20}...
val data = spark.read.json("file:///Users/jinxingguang/java_project/bigdata3/spark-demo/data/json").as[User]
data.show()
data.filter($"age" >=20).show()
}
自定义属性生成dataset
// Spark 的DataSet 既可以按collection类似于rdd的方式操作,也可以按SQL的方式操作
val rddData: Dataset[String] = session.read.textFile("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/person.txt")
import session.implicits._
val person: Dataset[(String, Int)] = rddData.map(
line => {
val strs = line.split(" ")
(strs(0), strs(1).toInt)
}
)
// 附加表的列描述
val cPerson = person.toDF("name", "age")
cPerson.show()
cPerson.printSchema()
DataSet的使用 - 读取文本文件
val conf = new SparkConf().setMaster("local").setAppName("sql hive")
val session = SparkSession
.builder()
.config(conf)
// .enableHiveSupport()
.getOrCreate()
val sc = session.sparkContext
sc.setLogLevel("Error")
import session.implicits._
val dataDF: Dataframe = List(
"hello world",
"hello world",
"hello msb",
"hello world",
"hello world",
"hello spark",
"hello world",
"hello spark"
).toDF("line") // 列名为line
dataDF.createTempView("ooxx") // 注册到catalog
val df: Dataframe = session.sql("select * from ooxx")
df.show()
df.printSchema()
// 计算词频 使用SQL的方式
// session.sql(" select word,count(1) from (select explode(split(line,' ')) word from ooxx) as tt group by word").show()
// 计算词频 使用api的方式 df 相当于 from table
val subTab = dataDF.selectExpr("explode(split(line,' ')) word")
val dataset: RelationalGroupedDataset = subTab.groupBy("word")
val res = dataset.count()
// 将结果保存到parquet文本
res.write.mode(SaveMode.Append).parquet("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/out/ooxx")
// 读取parquet文本
val frame: Dataframe = session.read.parquet("file:///Users/jinxingguang/java_project/bigdata-chauncy/spark-demo/data/out/ooxx")
frame.show()
frame.printSchema()
DataSet的使用 - 读取MySQL
// val conf = new SparkConf().setMaster("local").setAppName("mysql")
val session = SparkSession
.builder()
.master("local")
.appName("mysql")
.config("spark.sql.shuffle.partitions", "1") //默认会有100并行度的参数
// .enableHiveSupport()
.getOrCreate()
val sc = session.sparkContext
// sc.setLogLevel("ERROR")
sc.setLogLevel("INFO")
val properties = new Properties()
properties.put("url","jdbc:mysql://192.168.7.17:3306/mysql_test")
properties.put("user","xxx")
properties.put("password","xxx")
properties.put("driver","com.mysql.jdbc.Driver")
val usersDF: Dataframe = session.read.jdbc(properties.get("url").toString,"student",properties)
val scoreDF: Dataframe = session.read.jdbc(properties.get("url").toString,"score",properties)
usersDF.createTempView("userstab")
scoreDF.createTempView("scoretab")
val resDF: Dataframe = session.sql("select userstab.s_id,userstab.s_name, scoretab.s_score from userstab join scoretab on userstab.s_id = scoretab.s_id")
resDF.show()
resDF.printSchema()
// 默认并行度是100
// 21/10/13 07:47:05 INFO DAGScheduler: Submitting 100 missing tasks from ResultStage 11
Spark_On_Hive
standalone模式
org.scala-lang scala-library 2.11.12 org.apache.hadoop hadoop-client 2.6.5 org.apache.hadoop hadoop-hdfs 2.6.5 org.apache.spark spark-core_2.11 2.3.4 org.apache.hadoop hadoop-client org.scala-lang scala-library org.apache.spark spark-hive_2.11 2.3.4 org.apache.spark spark-sql_2.11 2.3.4 org.apache.spark spark-streaming_2.11 2.3.4 org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.4 org.apache.spark spark-hive_2.11 2.3.4 mysql mysql-connector-java 5.1.27 org.scala-tools maven-scala-plugin 2.15.2 scala-compile-first compile **/*.scala scala-test-compile testCompile
val ss: SparkSession = SparkSession
.builder()
.master("local")
.appName("standalone hive")
.config("spark.sql.shuffle.partitions", 1)
.config("spark.sql.warehouse.dir", "file:///Users/jinxingguang/java_project/bigdata-chauncy/spark/warehouse")
.enableHiveSupport() //开启hive支持 ? 自己会启动hive的metastore
.getOrCreate()
val sc: SparkContext = ss.sparkContext
// sc.setLogLevel("ERROR")
// ss.sql("create table xxx(name string,age int)")
// ss.sql("insert into xxx values ('zhangsan',18),('lisi',22)")
ss.sql("select * from xxx").show()
ss.catalog.listTables().show()
// 有数据库的概念
ss.sql("create database chauncy_db")
ss.sql("use chauncy_db")
ss.sql("create table meifute(name string,age int)")
ss.catalog.listTables().show()
hive集群模式
搭建hive-metastore服务的配置
hiveserver2服务的配置############################################ 其他hive节点,可以是hive命令行,可以是hiveserver2服务 node02机器,node03机器,node04机器 hive.metastore.warehouse.dir /user/hive_remote/warehouse javax.jdo.option.ConnectionURL jdbc:mysql://192.168.7.17:3306/hive_remote?createDatabaseIfNotExist=true 数据库连接 javax.jdo.option.ConnectionDriverName com.mysql.cj.jdbc.Driver javax.jdo.option.ConnectionUserName meifute javax.jdo.option.ConnectionPassword meifute hive.metastore.uris thrift://node01:9083 metastore地址
hive 的命令行简单介绍hive.metastore.uris thrift://node01:9083 metastore地址
######## hive 的命令行 hive --service --help 启动metastore hive --service metastore 在node02启动hive命令行 hive --service cli 等于 hive show databases; use default; show tables; 在node03启动hiveserver2 hive --service hiveserver2 等价于 hiveserver2 在node04使用beeline连接hiveserver2 beeline -u jdbc:hive2://node03:10000/default -n godIDEA使用spark程序交互hive
val ss: SparkSession = SparkSession
.builder()
.appName("cluster on hive")
.master("local")
.config("hive.metastore.uris", "thrift://node01:9083")
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = ss.sparkContext
sc.setLogLevel("ERROR")
ss.sql("create database IF NOT EXISTS spark_hive ")
ss.sql("use spark_hive")
ss.catalog.listTables().show() // 报错了,删除掉hive中的hbase表
// Class org.apache.hadoop.hive.hbase.HbaseSerDe not found
import ss.implicits._
val df01: Dataframe = List(
"zhangsan",
"lisi"
).toDF("name")
df01.createTempView("ooxx") // 通过session向catalog注册表
// SQL创建表,并插入数据
ss.sql("create table IF NOT EXISTS hddffs ( id int,age int)") //DDL
// 需要 core-site.xml 和 hdfs-site.xml 的hadoop配置
ss.sql("insert into hddffs values (4,3),(8,4),(9,3)") // DML 数据是通过spark自己和hdfs进行访问
ss.sql("show tables").show() // 临时表,没有保存到hive中
df01.write.saveAsTable("oxox") // 在hive中将数据保存成oxox表,不是临时向catalog注册的表
ss.sql("show tables").show() // 临时表,没有保存到hive中
Spark-Sql服务
SPARK整合HIVE
- spark配置
只需要metastore的地址配置就可以了 cat > /opt/bigdata/spark-2.3.4/conf/hive-site.xml <<-EOFEOF hive.metastore.uris thrift://node01:9083 metastore地址
- spark相关的命令启动服务
启动spark-shell
cd /opt/bigdata/spark-2.3.4/bin
./spark-shell --master yarn
scala> spark.sql("show tables").show
启动spark-sql
cd /opt/bigdata/spark-2.3.4/bin
./spark-sql --master yarn
查看网页 http://node03:8088/cluster 会出现SparkSQL
可以直接执行SQL,跟hive中共享,两边都可以操作
spark-sql> show tables;
Spark-Sql
# 对外暴露JDBC服务,接受SQL执行 cd /opt/bigdata/spark-2.3.4/sbin ./start-thriftserver.sh --master yarn 查看网页 http://node03:8088/cluster **多了一个 Thrift JDBC/ODBC Server** # 使用spark的beeline连接 /opt/bigdata/spark-2.3.4/bin/beeline -u jdbc:hive2://node01:10000/default -n god 打印 Connected to: Spark SQL (version 2.3.4) /usr/lib/spark-current/bin/beeline -u jdbc:hive2://localhost:10000 -n hadoop show tables;spark-sql函数
官网文档 2.3.4版本
自定义函数org/apache/spark/sql/functions.scala scala对应的源码
package com.chauncy.spark_dataframe_dataset
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
object MySpark_Sql_functions {
def main(args: Array[String]): Unit = {
val ss = SparkSession
.builder()
.appName("functions")
.master("local")
.getOrCreate()
ss.sparkContext.setLogLevel("ERROR")
import ss.implicits._
// 将List转成Dataframe
val dataDF: Dataframe = List(
("A", 1, 67),
("D", 1, 87),
("B", 1, 54),
("D", 2, 24),
("C", 3, 64),
("R", 2, 54),
("E", 1, 74)
).toDF("name", "class", "score")
dataDF.createTempView("users")
// 分组,排序统计
// ss.sql("select name,sum(score) " +
// " from users " +
// "group by name" +
// " order by name").show()
ss.sql("select * from users order by name asc,score desc").show()
//udf 普通的自定义函数
ss.udf.register("ooxx", (x: Int) => {
x * 10
})
ss.sql("select *,ooxx(score) mut_10 from users ").show()
// 自定义聚合函数
// class MyAggFun extends UserDefinedAggregateFunction
ss.udf.register("myagg", new MyAvgFun)
ss.sql("select name," +
" myagg(score) " +
" from users " +
" group by name").show()
}
class MyAvgFun extends UserDefinedAggregateFunction {
// 输入列的类型
override def inputSchema: StructType = {
// myagg(score)
StructType.apply(Array(StructField.apply("score", IntegerType, false)))
}
override def bufferSchema: StructType = {
StructType.apply(
Array(
StructField.apply("sum", IntegerType, false),
StructField.apply("count", IntegerType, false)
)
)
}
override def dataType: DataType = DoubleType
// 是否幂等
override def deterministic: Boolean = true
// 是否初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0
buffer(1) = 0
}
// 来一条数据更新一次
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 组内,一条记录调用一次
buffer(0) = buffer.getInt(0) + input.getInt(0) // sum
buffer(1) = buffer.getInt(1) + 1
}
// 溢写怎么计算
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
// 最后的结果
override def evaluate(buffer: Row): Double = {
buffer.getInt(0) /buffer.getInt(1)
}
}
}



