栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

sparksql自定义数据源

sparksql自定义数据源

sparksql自定义数据源

Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现,接口在 org.apache.spark.sql.sources 包下:interfaces.scala 。

(1)DefaultSource
package com.yyds.tags.spark.hbase

import org.apache.spark.sql.{Dataframe, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{baseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType}


class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister{




  val SPERATOR: String = ","
  val Hbase_TABLE_SELECt_FIELDS: String = "selectFields"


  // 使用简称
  override def shortName(): String = {
    "hbase"
  }

  
  override def createRelation(
                               sqlContext: SQLContext,
                               parameters: Map[String, String]
                             ): baseRelation = {


      // 1. 定义Schema信息
      val schema: StructType = StructType(
        parameters(Hbase_TABLE_SELECT_FIELDS)
          .split(SPERATOR)
          .map{
            field => StructField(field, StringType, nullable = true)
          })



    // 2. 创建HbaseRelation对象
    val relation = new HbaseRelation(sqlContext, parameters, schema)

    // 3. 返回对象
    relation
  }

  
  override def createRelation(
                               sqlContext: SQLContext,
                               mode: SaveMode,
                               parameters: Map[String, String],
                               data: Dataframe
                             ): baseRelation = {

    // 1. 创建HbaseRelation对象
    val relation = new HbaseRelation(sqlContext, parameters, data.schema)

    // 2.保存数据
    relation.insert(data,true)

    // 3.返回
    relation
  }


}

(2)HbaseRelation
package com.yyds.tags.spark.hbase

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{base64, Bytes}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Row, SQLContext}
import org.apache.spark.sql.sources.{baseRelation, InsertableRelation, TableScan}
import org.apache.spark.sql.types.StructType


case class HbaseRelation(context: SQLContext,
                    params: Map[String, String],
                    userSchema: StructType)
                                   extends baseRelation
                                   with TableScan
                                   with InsertableRelation
                                   with Serializable {


  // 连接Hbase数据库的属性名称
   val Hbase_ZK_QUORUM_KEY: String = "hbase.zookeeper.quorum"
   val Hbase_ZK_QUORUM_VALUE: String = "zkHosts"
   val Hbase_ZK_PORT_KEY: String = "hbase.zookeeper.property.clientPort"
   val Hbase_ZK_PORT_VALUE: String = "zkPort"
   val Hbase_TABLE: String = "hbaseTable"
   val Hbase_TABLE_FAMILY: String = "family"
   val SPERATOR: String = ","
   val Hbase_TABLE_SELECT_FIELDS: String = "selectFields"
   val Hbase_TABLE_ROWKEY_NAME: String = "rowKeyColumn"

  
  override def sqlContext: SQLContext = context

  
  override def schema: StructType = userSchema

  
  override def buildScan(): RDD[Row] = {

    // 读取数据
    
    // 1. 读取配置信息,加载HbaseClient配置(主要ZK地址和端口号)
    val conf = HbaseConfiguration.create()
    conf.set(Hbase_ZK_QUORUM_KEY, params(Hbase_ZK_QUORUM_KEY))
    conf.set(Hbase_ZK_PORT_VALUE, params(Hbase_ZK_PORT_VALUE))
    conf.set("zookeeper.znode.parent", "/hbase")

    // 2. 设置表的名称
    conf.set(TableInputFormat.INPUT_TABLE, params(Hbase_TABLE))

    //设置读取列簇和列名称
    val scan: Scan = new Scan()
    scan.addFamily(Bytes.toBytes(params(Hbase_TABLE_FAMILY)))

    val fields = params(Hbase_TABLE_SELECT_FIELDS)
                       .split(SPERATOR)

    fields.foreach{
      field => {
        scan.addColumn(Bytes.toBytes(params(Hbase_TABLE_FAMILY)),Bytes.toBytes(field))
      }
    }

    conf.set(TableInputFormat.SCAN,base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray) )


    // 3. 从Hbase表加载数据
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] =
      sqlContext.sparkContext.newAPIHadoopRDD(conf,
        classOf[TableInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result])

    // 将RDD转换为Schema
    // Dataframe = RDD[ROW] + Schema
    val rowsRDD: RDD[Row] = hbaseRDD.map{
      case (_,result) =>
        // 基于列名称获取对应的值
        val values: Seq[String] = fields.map{
          field =>
            val value: Array[Byte] = result.getValue(Bytes.toBytes(params(Hbase_TABLE_FAMILY)),Bytes.toBytes(field))
            // 转换为字符串
            Bytes.toString(value)
        }
        // 将序列转换为Row对象
        Row.fromSeq(values)
    }

    // 返回
    rowsRDD
  }


  
  override def insert(data: Dataframe, overwrite: Boolean): Unit = {
    // 1. 设置Hbase中Zookeeper集群信息
    val conf: Configuration = new Configuration()
    conf.set(Hbase_ZK_QUORUM_KEY, params(Hbase_ZK_QUORUM_KEY))
    conf.set(Hbase_ZK_PORT_VALUE, params(Hbase_ZK_PORT_VALUE))
    // 2. 设置读Hbase表的名称
    conf.set(TableOutputFormat.OUTPUT_TABLE, params(Hbase_TABLE))
    // 3. 数据转换
    val columns: Array[String] = data.columns
    val putsRDD: RDD[(ImmutableBytesWritable, Put)] =
      data.rdd.map { row =>
        // 获取RowKey
        val rowKey: String = row.getAs[String](params(Hbase_TABLE_ROWKEY_NAME))
        // 构建Put对象
        val put = new Put(Bytes.toBytes(rowKey))
        // 将每列数据加入Put对象中
        val familyBytes = Bytes.toBytes(params(Hbase_TABLE_FAMILY))
        columns.foreach { column =>
          put.addColumn(
            familyBytes, //
            Bytes.toBytes(column), //
            Bytes.toBytes(row.getAs[String](column)) //
          )
        }
        // 返回二元组
        (new ImmutableBytesWritable(put.getRow), put)
      }
    // 4. 保存数据到表
    putsRDD.saveAsNewAPIHadoopFile(
      s"/apps/hbase/output-" + System.currentTimeMillis(),
      classOf[ImmutableBytesWritable], //
      classOf[Put], //
      classOf[TableOutputFormat[ImmutableBytesWritable]], //
      conf //
    )

  }
}

(3)注册数据源

所以在项目【 resources 】目录下创建库目录【 meta-INF/services 】,并且创建文件
【 org.apache.spark.sql.sources.DataSourceRegister 】,内容为数据源主类

com.yyds.tags.spark.hbase.DefaultSource
(4)测试
package com.yyds.tags.hbase.read

import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}


object HbaseSQLTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[4]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    // 读取数据
    val usersDF: Dataframe = spark.read
      .format("hbase")
      .option("zkHosts", "192.168.42.7")
      .option("zkPort", "2181")
      .option("hbaseTable", "tbl_users")
      .option("family", "detail")
      .option("selectFields", "id,gender")
      .load()
    usersDF.printSchema()
    usersDF.cache()
    usersDF.show(10, truncate = false)


    // 保存数据
    usersDF.write
      .mode(SaveMode.Overwrite)
      .format("hbase")
      .option("zkHosts", "192.168.42.7")
      .option("zkPort", "2181")
      .option("hbaseTable", "tbl_users")
      .option("family", "info")
      .option("rowKeyColumn", "id")
      .save()
    spark.stop()
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761845.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号