Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现,接口在 org.apache.spark.sql.sources 包下:interfaces.scala 。
package com.yyds.tags.spark.hbase
import org.apache.spark.sql.{Dataframe, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{baseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister{
val SPERATOR: String = ","
val Hbase_TABLE_SELECt_FIELDS: String = "selectFields"
// 使用简称
override def shortName(): String = {
"hbase"
}
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]
): baseRelation = {
// 1. 定义Schema信息
val schema: StructType = StructType(
parameters(Hbase_TABLE_SELECT_FIELDS)
.split(SPERATOR)
.map{
field => StructField(field, StringType, nullable = true)
})
// 2. 创建HbaseRelation对象
val relation = new HbaseRelation(sqlContext, parameters, schema)
// 3. 返回对象
relation
}
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: Dataframe
): baseRelation = {
// 1. 创建HbaseRelation对象
val relation = new HbaseRelation(sqlContext, parameters, data.schema)
// 2.保存数据
relation.insert(data,true)
// 3.返回
relation
}
}
(2)HbaseRelation
package com.yyds.tags.spark.hbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{base64, Bytes}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Row, SQLContext}
import org.apache.spark.sql.sources.{baseRelation, InsertableRelation, TableScan}
import org.apache.spark.sql.types.StructType
case class HbaseRelation(context: SQLContext,
params: Map[String, String],
userSchema: StructType)
extends baseRelation
with TableScan
with InsertableRelation
with Serializable {
// 连接Hbase数据库的属性名称
val Hbase_ZK_QUORUM_KEY: String = "hbase.zookeeper.quorum"
val Hbase_ZK_QUORUM_VALUE: String = "zkHosts"
val Hbase_ZK_PORT_KEY: String = "hbase.zookeeper.property.clientPort"
val Hbase_ZK_PORT_VALUE: String = "zkPort"
val Hbase_TABLE: String = "hbaseTable"
val Hbase_TABLE_FAMILY: String = "family"
val SPERATOR: String = ","
val Hbase_TABLE_SELECT_FIELDS: String = "selectFields"
val Hbase_TABLE_ROWKEY_NAME: String = "rowKeyColumn"
override def sqlContext: SQLContext = context
override def schema: StructType = userSchema
override def buildScan(): RDD[Row] = {
// 读取数据
// 1. 读取配置信息,加载HbaseClient配置(主要ZK地址和端口号)
val conf = HbaseConfiguration.create()
conf.set(Hbase_ZK_QUORUM_KEY, params(Hbase_ZK_QUORUM_KEY))
conf.set(Hbase_ZK_PORT_VALUE, params(Hbase_ZK_PORT_VALUE))
conf.set("zookeeper.znode.parent", "/hbase")
// 2. 设置表的名称
conf.set(TableInputFormat.INPUT_TABLE, params(Hbase_TABLE))
//设置读取列簇和列名称
val scan: Scan = new Scan()
scan.addFamily(Bytes.toBytes(params(Hbase_TABLE_FAMILY)))
val fields = params(Hbase_TABLE_SELECT_FIELDS)
.split(SPERATOR)
fields.foreach{
field => {
scan.addColumn(Bytes.toBytes(params(Hbase_TABLE_FAMILY)),Bytes.toBytes(field))
}
}
conf.set(TableInputFormat.SCAN,base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray) )
// 3. 从Hbase表加载数据
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] =
sqlContext.sparkContext.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
// 将RDD转换为Schema
// Dataframe = RDD[ROW] + Schema
val rowsRDD: RDD[Row] = hbaseRDD.map{
case (_,result) =>
// 基于列名称获取对应的值
val values: Seq[String] = fields.map{
field =>
val value: Array[Byte] = result.getValue(Bytes.toBytes(params(Hbase_TABLE_FAMILY)),Bytes.toBytes(field))
// 转换为字符串
Bytes.toString(value)
}
// 将序列转换为Row对象
Row.fromSeq(values)
}
// 返回
rowsRDD
}
override def insert(data: Dataframe, overwrite: Boolean): Unit = {
// 1. 设置Hbase中Zookeeper集群信息
val conf: Configuration = new Configuration()
conf.set(Hbase_ZK_QUORUM_KEY, params(Hbase_ZK_QUORUM_KEY))
conf.set(Hbase_ZK_PORT_VALUE, params(Hbase_ZK_PORT_VALUE))
// 2. 设置读Hbase表的名称
conf.set(TableOutputFormat.OUTPUT_TABLE, params(Hbase_TABLE))
// 3. 数据转换
val columns: Array[String] = data.columns
val putsRDD: RDD[(ImmutableBytesWritable, Put)] =
data.rdd.map { row =>
// 获取RowKey
val rowKey: String = row.getAs[String](params(Hbase_TABLE_ROWKEY_NAME))
// 构建Put对象
val put = new Put(Bytes.toBytes(rowKey))
// 将每列数据加入Put对象中
val familyBytes = Bytes.toBytes(params(Hbase_TABLE_FAMILY))
columns.foreach { column =>
put.addColumn(
familyBytes, //
Bytes.toBytes(column), //
Bytes.toBytes(row.getAs[String](column)) //
)
}
// 返回二元组
(new ImmutableBytesWritable(put.getRow), put)
}
// 4. 保存数据到表
putsRDD.saveAsNewAPIHadoopFile(
s"/apps/hbase/output-" + System.currentTimeMillis(),
classOf[ImmutableBytesWritable], //
classOf[Put], //
classOf[TableOutputFormat[ImmutableBytesWritable]], //
conf //
)
}
}
(3)注册数据源
所以在项目【 resources 】目录下创建库目录【 meta-INF/services 】,并且创建文件
【 org.apache.spark.sql.sources.DataSourceRegister 】,内容为数据源主类
com.yyds.tags.spark.hbase.DefaultSource(4)测试
package com.yyds.tags.hbase.read
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
object HbaseSQLTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[4]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
// 读取数据
val usersDF: Dataframe = spark.read
.format("hbase")
.option("zkHosts", "192.168.42.7")
.option("zkPort", "2181")
.option("hbaseTable", "tbl_users")
.option("family", "detail")
.option("selectFields", "id,gender")
.load()
usersDF.printSchema()
usersDF.cache()
usersDF.show(10, truncate = false)
// 保存数据
usersDF.write
.mode(SaveMode.Overwrite)
.format("hbase")
.option("zkHosts", "192.168.42.7")
.option("zkPort", "2181")
.option("hbaseTable", "tbl_users")
.option("family", "info")
.option("rowKeyColumn", "id")
.save()
spark.stop()
}
}



