我们也知道hbase底层的数据源是hFile。
将hdfs数据转换为hfile, 通过bukload快速导入hbase ,当然里面有很多坑.
比如 : 版本不一致.
还有就是本地版本和集群版本不一致导致class不存在.写hbase代码最好是使用java和scala。我这里使用的是spark2.4 + hbase 2.1 切记不同版本使用的方法不一样。
我这里也保留了一些老的class希望对你有用。
话不多说直接上代码。我这里只是同步,所以都是写在了一个类上面。
package com.test.task
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
object Hdfs2Hbase {
var cdhPath = ""
var zookeeperQuorum = ""
var dataSourcePath = ""
var hdfsRootPath = ""
var hFilePath = ""
val tableName = "api:real_time_label"
val familyName = "baseInfo"
def main(args: Array[String]): Unit = {
//设置用户
System.setProperty("HADOOP_USER_NAME", "say")
// 运行shell 传参执行环境
// 生产运行记得设置运行参数
if (args.length >= 1) {
println("设置参数,运行环境:"+args(0))
if ("online".equals(args(0))) {
cdhPath = "hdfs://say-hdfs-cluster"
zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
dataSourcePath = cdhPath+"/user/say/hbase/txt/"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
} else {
cdhPath = "hdfs://say-cdh-master02.net:8020"
zookeeperQuorum = "192.168.2.101:2181,192.168.2.102:2181,192.168.2.103:2181"
dataSourcePath = cdhPath+"/user/say/hbase/txt/"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
}
} else {
println("运行环境: test")
cdhPath = "hdfs://say-cdh-master02.net:8020"
zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
dataSourcePath = cdhPath+"/user/say/hbase/txt/"
hdfsRootPath = cdhPath+"/user/say/"
hFilePath = cdhPath+"/user/say/hbase/hfile"
}
val sparkConf = new SparkConf()
.setAppName("hive2Hbase")
// .setMaster("local[*]") //本地运行打开,也可以设置参数 ,记得设置运行参数
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
val sc = new SparkContext(sparkConf)
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", hdfsRootPath)
hadoopConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
val fileSystem = FileSystem.get(new URI(cdhPath), hadoopConf)
val hbaseConf = HbaseConfiguration.create(hadoopConf)
println("我在这里")
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
println("连接成功啦~")
// 0. 准备程序运行的环境
// 如果 Hbase 表不存在,就创建一个新表
if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName))
val hcd = new HColumnDescriptor(familyName)
desc.addFamily(hcd)
admin.createTable(desc)
}
// 如果存放 HFile文件的路径已经存在,就删除掉
if (fileSystem.exists(new Path(hFilePath))) {
fileSystem.delete(new Path(hFilePath), true)
}
// 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:
// java.io.IOException: Added a key not lexically larger than previous.
val data = sc.textFile(dataSourcePath)
.map(str => {
val valueStr: Array[String] = str.split("\|")
val rowkey = valueStr(0)
// 处理数据的逻辑
(rowkey, valueStr)
})
.map(tuple => {
val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(tuple._2(1)), Bytes.toBytes(tuple._2(2)))
(new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv)
})
// 2. Save Hfiles on HDFS
val table: Table = hbaseConn.getTable(TableName.valueOf(tableName))
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
// println(table.getTableDescriptor)
// println(table)
HFileOutputFormat2.configureIncrementalLoadMap(job, table.getTableDescriptor)
val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", hFilePath)
data
.sortBy(x => (x._1, x._2.getKeyString), ascending = true)
.saveAsNewAPIHadoopDataset(jobConfiguration)
println("成功生成HFILE")
//这里就是版本不同而出现的问题.
// filedata.saveAsNewAPIHadoopFile(hFilePath,classOf[ImmutableBytesWritable],Class[KeyValue],Class[HFileOutputFormat2],hbaseConf)
// filedata.saveAsNewAPIHadoopFile(
// hFilePath,
// classOf[ImmutableBytesWritable],
// classOf[KeyValue],
// classOf[HFileOutputFormat2],
// hadoopConf
// )
// 3. Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)
hbaseConn.close()
fileSystem.close()
sc.stop()
}
}
3 pom
为防止大家也和我出现版本冲突 而导致not found class 的问题.我给大家贴出pom.xml .我有很多注释,也是防止不一致,你就单独使用依赖 .可能还需要建一个build目录.
4.0.0 com.test.task Hdfs2Hbase 1.0-SNAPSHOT 2.4.0 2.11.12 2.1.0 3.0.0 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.hadoop hadoop-client org.apache.hadoop hadoop-hdfs org.apache.hbase hbase-it ${hbase.version} src/main/scala src/test/scala org.apache.maven.plugins maven-compiler-plugin 3.5.1 1.8 1.8 UTF-8 org.scala-tools maven-scala-plugin compile testCompile ${scala.version} -target:jvm-1.8 -Xss2048K org.apache.maven.plugins maven-dependency-plugin copy-dependencies prepare-package copy-dependencies ${project.build.directory}/lib false false true maven-assembly-plugin jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-assembly-plugin false ${project.artifactId}-${ver} src/build/package.xml single package single org.apache.maven.plugins maven-antrun-plugin 1.8 del-lib-jar package run delete lib jar... src/main/resources ***.sh false src/main/resources/env/${env} src/main/scala **/*.* false org.scala-tools maven-scala-plugin ${scala.version} prd true prd 2.0.0-prd dev dev 1.0.0-SNAPSHOT test test 1.0.0-SNAPSHOT rc rc 1.0.0-SNAPSHOT aliyun http://maven.aliyun.com/nexus/content/groups/public/ true false



