1、将构建好的Iceberg的spark模块jar包,复制到spark jars下
cp /opt/module/iceberg-apache-iceberg-0.11.1/spark3-extensions/build/libs
def readTale(sparkSession: SparkSession) = {
//三种方式
sparkSession.table("hadoop_prod.db.testA").show()
sparkSession.read.format("iceberg").load("hadoop_prod.db.testA").show()
sparkSession.read.format("iceberg").load("/hive/warehouse/db/testA").show()// 路径到表就行,不要到具体文件
}
3.2、Spark DF读取Iceberg快照表的两种方式
def readSnapShots(sparkSession: SparkSession) = {
//根据查询 hadoop_prod.db.testA.snapshots 快照表可以知道快照时间和快照id
//根据时间戳读取,必须是时间戳 不能使用格式化后的时间
sparkSession.read
.option("as-of-timestamp", "1624961454000") //毫秒时间戳,查询比该值时间更早的快照
.format("iceberg")
.load("hadoop_prod.db.testA").show()
//根据快照 id 查询
sparkSession.read
.option("snapshot-id", "9054909815461789342")
.format("iceberg")
.load("hadoop_prod.db.testA").show()
}
3.3、写入数据并自动创建表
def writeAndCreateTable(sparkSession: SparkSession) = {
import sparkSession.implicits._
import org.apache.spark.sql.functions._
val data = sparkSession.createDataset[Student](Array(Student(1001, " 张 三 ", 18, "2021-06-28"),
Student(1002, "李四", 19, "2021-06-29"), Student(1003, "王五", 20, "2021-06-29")))
data.writeTo("hadoop_prod.db.test1").partitionedBy(col("dt")) //指定dt为分区列
.create()
}
case class Student(id: Int, name: String, age: Int, dt: String)
3.4、写数据
3.4.1、Append
def AppendTable(sparkSession: SparkSession) = {
//两种方式
import sparkSession.implicits._
val data = sparkSession.createDataset(Array(Student(1003, "王五", 11, "2021-06-29"), Student(1004, "赵六", 10, "2021-06-30")))
data.writeTo("hadoop_prod.db.test1").append() // 使用DataframeWriterV2 API spark3.0
data.write.format("iceberg").mode("append").save("hadoop_prod.db.test1") //使用DataframeWriterV1 API spark2.4
}
3.4.2、Overwrite
1、动态覆盖,只会刷新所属分区数据
// 动态覆盖,只会刷新所属分区数据
def OverWriteTable(sparkSession: SparkSession) = {
import sparkSession.implicits._
val data = sparkSession.createDataset(Array(Student(1003, " 王五", 11, "2021-06-29"),
Student(1004, "赵六", 10, "2021-06-30")))
data.writeTo("hadoop_prod.db.test1").overwritePartitions()
}
2、静态覆盖,手动指定分区
def OverWriteTable2(sparkSession: SparkSession) = {
import sparkSession.implicits._
val data = sparkSession.createDataset(Array(Student(1, "s1", 1, "111"), Student(2, "s2", 2, "111")))
data.writeTo("hadoop_prod.db.test1").overwrite($"dt" === "2021-06-30")
}
注意:输出表不能存在多个分区。不然会报错



