1、利用SparkSession从Hbase中读取数据,转换为Dataframe
package com.yyds.tags.tools
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.base64
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
object HbaseTools {
def read(spark:SparkSession , zkHosts:String, zkPort:String,table:String,family:String,fields:Seq[String]): Dataframe = {
val sc: SparkContext = spark.sparkContext
// 读取数据
// 1. 读取配置信息,加载HbaseClient配置(主要ZK地址和端口号)
val conf = HbaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", zkHosts)
conf.set("hbase.zookeeper.property.clientPort", zkPort)
conf.set("zookeeper.znode.parent", "/hbase")
// 2. 设置表的名称
conf.set(TableInputFormat.INPUT_TABLE, table)
//设置读取列簇和列名称
val scan: Scan = new Scan()
scan.addFamily(Bytes.toBytes(family))
fields.foreach{
field => {
scan.addColumn(Bytes.toBytes(family),Bytes.toBytes(field))
}
}
conf.set(TableInputFormat.SCAN,base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray) )
// 3. 从Hbase表加载数据
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] =
sc.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
// 将RDD转换为Schema
// Dataframe = RDD[ROW] + Schema
val rowsRDD: RDD[Row] = hbaseRDD.map{
case (_,result) =>
// 基于列名称获取对应的值
val values: Seq[String] = fields.map{
field =>
val value: Array[Byte] = result.getValue(Bytes.toBytes(family),Bytes.toBytes(field))
// 转换为字符串
Bytes.toString(value)
}
// 将序列转换为Row对象
Row.fromSeq(values)
}
// 自定义schema
val schema: StructType = StructType(
fields.map{
field =>
StructField(field,StringType,nullable = true)
}
)
spark.createDataframe(rowsRDD,schema)
}
}
2、利用SparkSession把Dataframe存入到Hbase中
package com.yyds.tags.tools
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.base64
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
object HbaseTools {
def write(dataframe: Dataframe, zkHosts:String, zkPort:String,table:String,family:String,rowKeyColumn:String): Unit = {
// 1. 设置Hbase中Zookeeper集群信息
val conf: Configuration = new Configuration()
conf.set("hbase.zookeeper.quorum", zkHosts)
conf.set("hbase.zookeeper.property.clientPort", zkPort)
// 2. 设置读Hbase表的名称
conf.set(TableOutputFormat.OUTPUT_TABLE, table)
// 3. 数据转换
val columns: Array[String] = dataframe.columns
val putsRDD: RDD[(ImmutableBytesWritable, Put)] =
dataframe.rdd.map { row =>
// 获取RowKey
val rowKey: String = row.getAs[String](rowKeyColumn)
// 构建Put对象
val put = new Put(Bytes.toBytes(rowKey))
// 将每列数据加入Put对象中
val familyBytes = Bytes.toBytes(family)
columns.foreach { column =>
put.addColumn(
familyBytes, //
Bytes.toBytes(column), //
Bytes.toBytes(row.getAs[String](column)) //
)
}
// 返回二元组
(new ImmutableBytesWritable(put.getRow), put)
}
// 4. 保存数据到表
putsRDD.saveAsNewAPIHadoopFile(
s"/apps/hbase/$table-" + System.currentTimeMillis(),
classOf[ImmutableBytesWritable], //
classOf[Put], //
classOf[TableOutputFormat[ImmutableBytesWritable]], //
conf //
)
}
}
3、测试
package com.yyds.tags.hbase.tools
import com.yyds.tags.tools.HbaseTools
import org.apache.spark.sql.{Dataframe, SparkSession}
object HbaseToolsTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[4]")
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
val df: Dataframe = HbaseTools.read(
spark, "192.168.42.7", "2181",
"tbl_users", "detail", Seq("id", "gender")
)
println(s"count = ${df.count()}")
df.printSchema()
df.show(100, truncate = false)
HbaseTools.write(
df, "192.168.42.7", "2181",
"tbl_users_test", "info", "id"
)
spark.stop()
}
}