栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkSQL 数据源的加载与保存

SparkSQL 数据源的加载与保存

Spark SQL 支持通过 Dataframe 接口对多种数据源进行操作。可以使用关系转换对 Dataframe 进行操作,也可以用于创建临时视图。将 Dataframe 注册为临时视图允许对其数据运行 SQL 查询。

1. 通用的加载/保存功能

数据源由它们的完全限定名称(即org.apache.spark.sql.parquet)指定,但对于内置源,可以使用它们的短名称(json、parquet、jdbc、orc、libsvm、csv、text)。从任何数据源类型加载的 Dataframe 都可以使用此语法转换为其他类型。

 默认的数据加载/保存格式为 parquet

1.1 加载数据

spark.read.load 是加载数据的通用方法!对不同的数据,可以使用不同的数据格式进行设定。

语法格式:

spark.read.format("...")[.option("...")].load("...")

参数描述:

  • format:指定加载的数据类型,包括 csv、jdbc、json、orc、parquet、textFile
  • load:加载数据的路径
  • option:在 jdbc 格式下,需要传入 JDBC 相应的参数(url、user、password、dbtable)

使用 SQL 方式:文件格式.文件路径

spark.sql(select * from json.`path`).show
1.2 保存数据

df.write.save 是保存数据的通用方法!

语法格式:

df.write.format("...")[.option("...")].save("...")

参数描述:

  • format:指定加载的数据类型,包括 csv、jdbc、json、orc、parquet、textFile
  • save:保存数据的路径
  • option:在 jdbc 格式下,需要传入 JDBC 相应的参数(url、user、password、dbtable);也可以通过 mode 方法使用 SaveMode 指明如何处理数据

SaveMode 是一个枚举类型,都是没有加锁的,也不是原子操作!

Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists (default)"error" or "errorifexists" (default)如果数据已经存在,则抛出异常
SaveMode.Append"append"如果数据/表已经存在,则追加
SaveMode.Overwrite"overwrite"如果数据/表已经存在,则覆盖
SaveMode.Ignore"ignore"如果数据已经存在,则不操
1.3 持久化到表中

Dataframes 也可以使用 saveAsTable 命令将其作为持久表保存到 Hive metastore 中。需要注意的是,使用此功能不需要现有的 Hive 部署。Spark 将会创建一个默认的本地 Hive 元存储(使用 Derby)。

与 createOrReplaceTempView 命令不同, saveAsTable 将实现 Dataframe 的内容,并创建一个指向Hive metastore 中的数据的指针。只要持有 metastore 的连接,即使 Spark 程序重新启动,表也仍然存在。持久化表的 Dataframe 可以通过调用 SparkSession 上的 table 方法来创建。

对于基本文件的数据源,例如 text、parquet、json 等,您可以通过 path 选项指定自定义表路径 ,例如 df.write.option("path", "/some/path").saveAsTable("t")。删除表时,不会删除自定义表路径,表数据仍然存在。如果未指定自定义表路径,Spark 会将数据写入到仓库目录下的默认表路径中。当表被删除时,默认的表路径也将被删除。

从 Spark 2.1 开始,持久数据源表将每个分区的元数据存储在 Hive 元存储中。这带来了几个好处:

  • 因为metastore只能为查询返回必要的分区,所以不再需要在第一个查询中查所有的分区。
  • Hive DDL 操作比如ALTER TABLE PARTITION ... SET LOCATION,现在可用数据源 API 创建的表。

需注意,在创建外部数据源表(带有path选项的表)时,默认情况下不会收集分区信息。如果要同步 metastore 中的分区信息,可以调用 MSCK REPAIR TABLE。

1.4 分桶、排序、分区

基于文件的数据源,可以对输出进行分桶或分区并排序。

分桶和排序仅适用于持久化到表中:

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

当使用 Dataset API 时,使用save和saveAsTable 之前可使用分区。

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

对单个表同时使用分区和分桶:

usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")

partitionBy 会创建一个目录结构,因此,它对具有高基数的列的适用性有限。相反, bucketBy将数据分布在固定数量的桶中,并且可以在唯一值的数量不受限制时使用。

2. 文件格式 2.1 Parquet

Spark SQL 的默认数据源为 Parquet 格式,Parquet 是一种流行的列式存储格式,可以高效的存储具有嵌套字段的记录。

存储方式的示意图如下:

Parquet 加载/保存:

加载数据:

val df = spark.read.load("path")

保存数据:

df.write.mode("...").save("path")
2.2 JSON

Spark SQL 能够自动推测 JSON 数据集的结构,并将其加载为 DataSet[Row]。

JSON 加载/保存:

加载数据:

val df = spark.read.format("json").load("path")
val df = spark.read.json("path")

保存数据:

df.write.format("json")[.mode("...")].save("path")
2.3 CSV

Spark 可以配置 CSV 文件的列信息,读取 CSV 文件时,第一行可作为表头。

spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("path")
2.4 MySQL

Spark SQL 可通过 JDBC 从关系型数据库中读取数据并创建 Dataframe。

如果使用 spark-shell 操作,可以在启动 shell 时指定相关的数据驱动路径或者将相关的数据库驱动放到 spark 的类路径下。

如下,演示 IDEA 中的 MySQL 操作!

2.4.1 添加依赖

    mysql
    mysql-connector-java
    8.0.25

2.4.2 读取数据

连接方式一:

spark.read.format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/student")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("user", "root")
  .option("password", "123456")
  .option("dbtable", "books")
  .load()
  .show()

连接方式二:

spark.read.format("jdbc")
  .options(Map("url" -> "jdbc:mysql://localhost:3306/student?user=root&password=123456"
    , "driver" -> "com.mysql.cj.jdbc.Driver"
    , "dbtable" -> "books"))
  .load()
  .show()

连接方式三:

val prop: Properties = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "123456")
val url = "jdbc:mysql://localhost:3306/student"
spark.read.jdbc(url, "books", prop)
  .show()
2.4.3 写入数据

写入方式一:

val df =  spark.read.load("employees.json")
df.write.format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/student")
  .option("driver", "com.mysql.cj.jdbc.Driver")
  .option("user", "root")
  .option("password", "root199962")
  .option("dbtable", "books")
  .mode(SaveMode.Append)
  .save()

写入方式二:

val prop: Properties = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "root199962")
val url = "jdbc:mysql://localhost:3306/student?user=root&password=root199962"
df.write.mode(SaveMode.Append).jdbc(url, "books", prop)
2.5 Hive 2.5.1 使用内置 Hive

内置的 Hive 会将元数据存储在 derby 中,默认的仓库地址为 $SPARK_HOME/spark-warehouse

如果使用的是内部的Hive,在Spark2.0之后,spark.sql.warehouse.dir用于指定数据仓库的地址,如果你需要是用HDFS作为路径,那么需要将core-site.xml和hdfs-site.xml 加入到Spark conf目录,否则只会创建master节点上的warehouse目录,查询时会出现文件找不到的问题,这是需要向使用HDFS,则需要将metastore删除,重启集群。

1️⃣ 启动 HDS 集群,启动 Spark

2️⃣ 启动 spark-sql

spark-sql

如果时初次启动 SparkSQL,会在 spark 的 bin 目录下生成两个文件,一个是元数据,一个是derby数据库

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NXTzsyam-1640230158891)(https://cdn.jsdelivr.net/gh/JOEL-T99/Pic//img/image-20211223102622361.png)]

3️⃣ 启动 SparkSQL 后,可直接使用 SQL 语句,可以发现,有一个像 Hive 刚开始进去的一个默认的 default 数据库,但是并没有 Hive 中其他的数据库。因为 Hive 的元数据配放在 MySQL 下,而现在使用的时 SparkSQL 内置的 Hive,该内置 Hive 默认使用 derby 存储元数据。

 所以我们需要做的是,将 Hive 的相关配置文件,复制到 Spark 中。

4️⃣ 在 bin 目录下,删除刚生成的两个新文件

rm -rf metastore_db derby.log

5️⃣ 将 Hive 的 hive-site.xml 复制(cp)或到本机 spark 的 conf 目录下,并发送(scp)其他主机的对应目录

scp /opt/hive-2.3.9/conf/hive-site.xml <其他主机>

6️⃣ 将连接MySQL 的驱动包,复制到本机 spark 的 jars 目录下,并发送(scp)其他主机的对应目录

scp /opt/hive-2.3.9/lib/mysql-connector-java-8.0.25.jar <其他主机>

7️⃣ 再次启动 SparkSQL,并查看数据库是否存在,如下图,已经能查到数据库和相关的表了!

8️⃣ 查看表中的数据

 select * from weblog_extracted limit 5;

9️⃣ 查询时使用函数

 select ip, avg(up) avg_up, avg(down) avg_down, max(up) max_up from weblog_extracted group by ip limit 5;

1️⃣0️⃣ 在主机的8080端口,可以查看 Spark 任务,可以在最后一个页面看到,SparkSQL 底层跑的还是 RDD

Hive 底层跑的是 MapReduce!

[外链图片转存中…(img-95Fn0vRq-1640230158897)]

2.5.2 使用外部 Hive

如果想连接外部已经部署好的Hive,需要通过以下几个步骤:

  • Spark 要接管Hive 需要把 hive-site.xml 拷贝到 conf/目录下
  • 把 Mysql 的驱动 copy 到 jars/目录下
  • 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
  • 重启 spark-shell
$ bin/spark-shell --master spark://hadoop01:7077 --jars mysql-connector-java-5.1.27-bin.jar

 


❤️ END ❤️
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676834.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号