引入依赖:
org.apache.phoenix phoenix-spark 4.14.0-Hbase-1.4
读取 Phoenix 数据
package com.spark.core.phoenix
import org.apache.spark.sql.SparkSession
object ReadFromPhoenix {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("test").getOrCreate()
// 使用 DataSourceV2 API 加载Phoenix表为Dataframe
val df = spark.read
.format("org.apache.phoenix.spark")
.option("table", "testdb.student")
.option("zkUrl", "localhost:2181")
.load()
df.show()
import org.apache.phoenix.spark._
// 使用 phoenixTableAsDataframe() 加载Phoenix表为Dataframe
val df = spark.sqlContext.phoenixTableAsDataframe(
table = "testdb.student",
columns = Seq("ID","NAME","SEX","PHONE","EMAIL"),
predicate = Some("SEX = 1"), // 可设置where条件
zkUrl = Some("localhost:2181"))
df.show()
// 使用 phoenixTableAsRDD() 加载Phoenix表为RDD
val rdd =spark.sparkContext.phoenixTableAsRDD(
table = "testdb.student",
columns = Seq("ID","NAME","SEX","PHONE","EMAIL"),
zkUrl = Some("localhost:2181"))
rdd.collect().foreach { println }
}
}
写入 Phoenix 数据
case class student(id: String, name: String, sex: Short, phone: String, email: String)
package com.spark.core.phoenix
import org.apache.spark.sql.SparkSession
object WriteToPhoenix {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("test").getOrCreate()
val rdd = spark.sparkContext.parallelize(Array(
student("2021120604","小明",1,"13783782988","123456789@email.com"),
student("2021120605","小红",0,"13786564688","888@email.com")))
import spark.implicits._
val df = rdd.toDF("ID","NAME","SEX","PHONE","EMAIL")
// 使用 DataSourceV2 API 保存Dataframe到Phoenix表
df.write
.format("org.apache.phoenix.spark")
.mode("overwrite") // 仅支持覆盖的保存模式
.option("table", "testdb.student")
.option("zkUrl", "localhost:2181")
.save()
import org.apache.phoenix.spark._
// 使用 saveToPhoenix() 保存Dataframe到Phoenix表
df.saveToPhoenix(Map("table" -> "testdb.student", "zkUrl" -> "localhost:2181"))
// 使用 saveToPhoenix() 保存RDD到Phoenix表
rdd.saveToPhoenix(
tableName = "testdb.student",
cols = Seq("ID","NAME","SEX","PHONE","EMAIL"),
zkUrl = Some("localhost:2181"))
}
}



