栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark-core从hbase中读写数据

spark-core从hbase中读写数据

spark交互Hbase

Spark可以从Hbase表中读写(Read/Write)数据,底层采用 TableInputFormat 和 TableOutputFormat 方式,与MapReduce与Hbase集成完全一样,使用相同输入格式InputFormat 和输出格式 OutputFoamt 。

1、写入数据
package com.yyds.tags.hbase.write


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object HbaseWriteTest {
  def main(args: Array[String]): Unit = {
    // a. 构建SparkContext实例对象
    val sparkConf = new SparkConf()
      .setAppName("SparkHbaseWrite")
      .setMaster("local[4]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Put])) // 注册哪些类型使用Kryo序列化, 最好注册RDD中类型
    val sc: SparkContext = new SparkContext(sparkConf)


    // b. 模拟数据集
    val tagsRDD: RDD[(String, String)] = sc.parallelize(
      List(("1001", "gender:男,job:教师"),
        ("1002", "gender:女,job:工人"),
        ("1003", "gender:男,job:学生"),
        ("1004", "gender:男,job:工人")
      ),
      numSlices = 2
    )


    // TODO:将RDD数据保存到Hbase表中,要求RDD数据类型为二元组,Key: ImmutableBytesWritable, Value:Put
    
    val datasRDD: RDD[(ImmutableBytesWritable, Put)] =
    tagsRDD.map { case (userId, tags) =>
      // a. 构建RowKey
      val rowKey: Array[Byte] = Bytes.toBytes(userId)
      // b. 构建put对象
      val put = new Put(rowKey)
      // 设置列
      put.addColumn(
        Bytes.toBytes("user"),
        Bytes.toBytes("userId"),
        Bytes.toBytes(userId)
      )
      tags.split(",").foreach { tag =>
        val Array(field, value) = tag.split(":")
        put.addColumn(
          Bytes.toBytes("user"),
          Bytes.toBytes(field),
          Bytes.toBytes(value)
        )
      }
      (new ImmutableBytesWritable(rowKey), put)
    }

    // 1. 设置Hbase依赖Zookeeper相关配置信息
    val conf: Configuration = HbaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum","192.168.42.7")
    conf.set("hbase.zookeeper.property.clientPort","2181")
    conf.set("zookeeper.znode.parent","/hbase")

    // 2. 数据写入表的名称
    conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_tags")


    datasRDD.saveAsNewAPIHadoopFile(
      s"datas/hbase/output-${System.nanoTime()}",
      classOf[ImmutableBytesWritable],
      classOf[Put],
      classOf[TableOutputFormat[ImmutableBytesWritable]],
      conf
    )

    // 应用结束,关闭资源
    sc.stop()

  }
}

2、读取数据
package com.yyds.tags.hbase.read

import org.apache.hadoop.hbase.{CellUtil, HbaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object HbaseReadTest {

  def main(args: Array[String]): Unit = {
    // 创建SparkContext实例对象
    val sparkConf = new SparkConf()
      .setMaster("local[4]")
      .setAppName("HbaseReadTest")
      // 设置使用Kryo序列
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册哪些类型使用Kryo序列化, 最好注册RDD中类型
      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))


    val sc: SparkContext = SparkContext.getOrCreate(sparkConf)
    // 读取数据
    
    // 1. 读取配置信息,加载HbaseClient配置(主要ZK地址和端口号)
    val conf = HbaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "192.168.42.7")


    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase")

    // 2. 设置表的名称
    conf.set(TableInputFormat.INPUT_TABLE, "tbl_users")
    // 3. 从Hbase表加载数据
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] =
      sc.newAPIHadoopRDD(conf,
        classOf[TableInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result])
    println(s"count = ${hbaseRDD.count()}")
    hbaseRDD.take(2).foreach {
      case (_, result) => println(s"RowKey = ${Bytes.toString(result.getRow)}")
        for (cell <- result.rawCells()) {
          // 列簇CF
          val cf = Bytes.toString(CellUtil.cloneFamily(cell))
          // 列名称
          val column = Bytes.toString(CellUtil.cloneQualifier(cell))
          //列的值
          val value = Bytes.toString(CellUtil.clonevalue(cell))

          println(s"t ${cf}:${column} = ${value}, version -> ${cell.getTimestamp}")
        }
    }
    // 应用结束,关闭资源
    sc.stop()

  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752855.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号