栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

记一次·Spark读Hbase

记一次·Spark读Hbase

记一次·Spark读Hbase 一 、背景

过年回来,数仓发现hive的一个表丢数据了,需要想办法补数据。这个表是flume消费kafka写hive。但是kafka里只保存最近7天数据,有部分数据kafka里已经没有了。不过这份数据会同时被消费到Hbase内存储一份,并且Hbase内的数据是正常的。所以这次任务是读Hbase数据写Hive表。

    Hbase表内,只有一个列族info,列族内只有一个列value。value为“|”分割的字段字符串。“|”切割字符串得到最后一个字段为数据的时间戳,记为ts。hive根据ts转换后的yyyyMMdd作为分区。
二、计划
    计划使用Spark,通过newAPIHadoopRDD的方式读Hbase数据到内存。按照Hbase中的timestamp过滤数据,取20220127到20220201之间的数据。写入hdfs文件
三、代码
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object SparkScanHbase {

  def main(args: Array[String]): Unit = {
    println(args)
    val startTS: String = args(0)
    val endTS: String = args(1)
    val path: String = args(2)

    val spark: SparkSession =
      SparkSession.builder().appName("SparkHbaseRDD")
        .config("spark.kryoserializer.buffer","1024")
        .getOrCreate()
    val sc: SparkContext = spark.sparkContext

    val tableName = "table"

    val hbaseConf: Configuration = HbaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) // TableInputFormat在org.apache.hbase.hbase-mapreduce包
    hbaseConf.set(TableInputFormat.SCAN_TIMERANGE_START, startTS)
    hbaseConf.set(TableInputFormat.SCAN_TIMERANGE_END, endTS)

    println("开始运行……")
    //读取数据并转化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    val filterRdd: RDD[String] = hbaseRDD
      .map(rdd => {
        val result: Result = rdd._2
        //获取行键
        //通过列族和列名获取列
        val value: String = Bytes.toString(result.value())
        //        println(value.substring(value.length - 20, value.length))
        value
      })

    println(s"filterRdd.count:${filterRdd.count()}")

    filterRdd.saveAsTextFile(path)

    filterRdd.collect()

    spark.stop()
    println("程序结束!")
  }

}

在TableInputFormat类中,可以查看可以配置的扫面Hbase的配置条件

  public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
  
  private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
  
  public static final String SCAN = "hbase.mapreduce.scan";
  
  public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
  
  public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
  
  public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
  
  public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
  
  public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
  
  public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
  
  public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
  
  public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
  
  public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
  
  public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
  
  public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
  
  public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";

        2.0.0
        1.2.0
        2.6.0
        2.11.12
        1.7
        UTF-8
    

    
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
            provided
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-common
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
        
    
四、遇到的坑

刚开始调试的时候,忘记了最基本的一点:spark程序,需要有行动算子,才能生成执行计划并执行,于是乎n次调试运行代码都没有任何反应直接退出0.0。代码Hbase版本和服务器上Hbase版本不一致,导致扫描数据报错。

org.apache.hadoop.hbase.DoNotRetryIOException:
/192.168.x.x:16020 is unable to read call parameter from client 10.47.x.x
五、参考

spark 存入hbase_spark将数据写入hbase以及从hbase读取数据

spark读取Hbase数据的一次坑爹经历

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746527.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号