过年回来,数仓发现hive的一个表丢数据了,需要想办法补数据。这个表是flume消费kafka写hive。但是kafka里只保存最近7天数据,有部分数据kafka里已经没有了。不过这份数据会同时被消费到Hbase内存储一份,并且Hbase内的数据是正常的。所以这次任务是读Hbase数据写Hive表。
- Hbase表内,只有一个列族info,列族内只有一个列value。value为“|”分割的字段字符串。“|”切割字符串得到最后一个字段为数据的时间戳,记为ts。hive根据ts转换后的yyyyMMdd作为分区。
- 计划使用Spark,通过newAPIHadoopRDD的方式读Hbase数据到内存。按照Hbase中的timestamp过滤数据,取20220127到20220201之间的数据。写入hdfs文件
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object SparkScanHbase {
def main(args: Array[String]): Unit = {
println(args)
val startTS: String = args(0)
val endTS: String = args(1)
val path: String = args(2)
val spark: SparkSession =
SparkSession.builder().appName("SparkHbaseRDD")
.config("spark.kryoserializer.buffer","1024")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val tableName = "table"
val hbaseConf: Configuration = HbaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) // TableInputFormat在org.apache.hbase.hbase-mapreduce包
hbaseConf.set(TableInputFormat.SCAN_TIMERANGE_START, startTS)
hbaseConf.set(TableInputFormat.SCAN_TIMERANGE_END, endTS)
println("开始运行……")
//读取数据并转化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
val filterRdd: RDD[String] = hbaseRDD
.map(rdd => {
val result: Result = rdd._2
//获取行键
//通过列族和列名获取列
val value: String = Bytes.toString(result.value())
// println(value.substring(value.length - 20, value.length))
value
})
println(s"filterRdd.count:${filterRdd.count()}")
filterRdd.saveAsTextFile(path)
filterRdd.collect()
spark.stop()
println("程序结束!")
}
}
在TableInputFormat类中,可以查看可以配置的扫面Hbase的配置条件
public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; private static final String SPLIT_TABLE = "hbase.mapreduce.splittable"; public static final String SCAN = "hbase.mapreduce.scan"; public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start"; public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop"; public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family"; public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns"; public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp"; public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start"; public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end"; public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions"; public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks"; public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize"; public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
四、遇到的坑2.0.0 1.2.0 2.6.0 2.11.12 1.7 UTF-8 org.apache.spark spark-sql_2.11 ${spark.version} provided org.apache.hbase hbase-client ${hbase.version} org.apache.hbase hbase-common ${hbase.version} org.apache.hbase hbase-server ${hbase.version}
刚开始调试的时候,忘记了最基本的一点:spark程序,需要有行动算子,才能生成执行计划并执行,于是乎n次调试运行代码都没有任何反应直接退出0.0。代码Hbase版本和服务器上Hbase版本不一致,导致扫描数据报错。
org.apache.hadoop.hbase.DoNotRetryIOException: /192.168.x.x:16020 is unable to read call parameter from client 10.47.x.x五、参考
spark 存入hbase_spark将数据写入hbase以及从hbase读取数据
spark读取Hbase数据的一次坑爹经历



