栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

scala版本,spark将HDFS上的数据同步到hbase

scala版本,spark将HDFS上的数据同步到hbase

1 spark将HDFS上的数据同步到hbase

我们也知道hbase底层的数据源是hFile。
将hdfs数据转换为hfile, 通过bukload快速导入hbase ,当然里面有很多坑.
比如 : 版本不一致.
还有就是本地版本和集群版本不一致导致class不存在.写hbase代码最好是使用java和scala。我这里使用的是spark2.4 + hbase 2.1 切记不同版本使用的方法不一样。
我这里也保留了一些老的class希望对你有用。

2 代码

话不多说直接上代码。我这里只是同步,所以都是写在了一个类上面。

package com.test.task

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}


object Hdfs2Hbase {

    var cdhPath = ""
    var zookeeperQuorum = ""
    var dataSourcePath = ""
    var hdfsRootPath = ""
    var hFilePath = ""
    val tableName = "api:real_time_label"
    val familyName = "baseInfo"

    def main(args: Array[String]): Unit = {

        //设置用户
        System.setProperty("HADOOP_USER_NAME", "say")

        //  运行shell 传参执行环境
        //  生产运行记得设置运行参数
        if (args.length >= 1) {
            println("设置参数,运行环境:"+args(0))
            if ("online".equals(args(0))) {
                cdhPath = "hdfs://say-hdfs-cluster"
                zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
                dataSourcePath = cdhPath+"/user/say/hbase/txt/"
                hdfsRootPath = cdhPath+"/user/say/"
                hFilePath = cdhPath+"/user/say/hbase/hfile"
            } else {
                cdhPath = "hdfs://say-cdh-master02.net:8020"
                zookeeperQuorum = "192.168.2.101:2181,192.168.2.102:2181,192.168.2.103:2181"
                dataSourcePath = cdhPath+"/user/say/hbase/txt/"
                hdfsRootPath = cdhPath+"/user/say/"
                hFilePath = cdhPath+"/user/say/hbase/hfile"
            }
        } else {
            println("运行环境: test")
            cdhPath = "hdfs://say-cdh-master02.net:8020"
            zookeeperQuorum = "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
            dataSourcePath = cdhPath+"/user/say/hbase/txt/"
            hdfsRootPath = cdhPath+"/user/say/"
            hFilePath = cdhPath+"/user/say/hbase/hfile"
        }

        val sparkConf = new SparkConf()
          .setAppName("hive2Hbase")
//          .setMaster("local[*]")    //本地运行打开,也可以设置参数 ,记得设置运行参数
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))

        val sc = new SparkContext(sparkConf)
        val hadoopConf = new Configuration()
        hadoopConf.set("fs.defaultFS", hdfsRootPath)
        hadoopConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
        val fileSystem = FileSystem.get(new URI(cdhPath), hadoopConf)
        val hbaseConf = HbaseConfiguration.create(hadoopConf)
        println("我在这里")
        hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
        hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
        val hbaseConn = ConnectionFactory.createConnection(hbaseConf)

        val admin = hbaseConn.getAdmin
        println("连接成功啦~")

        // 0. 准备程序运行的环境
        // 如果 Hbase 表不存在,就创建一个新表
        if (!admin.tableExists(TableName.valueOf(tableName))) {
            val desc = new HTableDescriptor(TableName.valueOf(tableName))
            val hcd = new HColumnDescriptor(familyName)
            desc.addFamily(hcd)
            admin.createTable(desc)
        }
        // 如果存放 HFile文件的路径已经存在,就删除掉
        if (fileSystem.exists(new Path(hFilePath))) {
            fileSystem.delete(new Path(hFilePath), true)
        }

        // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:
        // java.io.IOException: Added a key not lexically larger than previous.

        val data = sc.textFile(dataSourcePath)
          .map(str => {
              val valueStr: Array[String] = str.split("\|")
              val rowkey = valueStr(0)
              // 处理数据的逻辑
              (rowkey, valueStr)
          })
          .map(tuple => {
              val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(tuple._2(1)), Bytes.toBytes(tuple._2(2)))
              (new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv)
          })

        // 2. Save Hfiles on HDFS
        val table: Table = hbaseConn.getTable(TableName.valueOf(tableName))
        val job = Job.getInstance(hbaseConf)
        job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setMapOutputValueClass(classOf[KeyValue])
        val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
//        println(table.getTableDescriptor)
//        println(table)

        HFileOutputFormat2.configureIncrementalLoadMap(job, table.getTableDescriptor)

        val jobConfiguration = job.getConfiguration
        jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", hFilePath)
        data
          .sortBy(x => (x._1, x._2.getKeyString), ascending = true)
          .saveAsNewAPIHadoopDataset(jobConfiguration)
        println("成功生成HFILE")
//这里就是版本不同而出现的问题.
//        filedata.saveAsNewAPIHadoopFile(hFilePath,classOf[ImmutableBytesWritable],Class[KeyValue],Class[HFileOutputFormat2],hbaseConf)
//        filedata.saveAsNewAPIHadoopFile(
//              hFilePath,
//              classOf[ImmutableBytesWritable],
//              classOf[KeyValue],
//              classOf[HFileOutputFormat2],
//              hadoopConf
//          )

        //  3. Bulk load Hfiles to Hbase
        val bulkLoader = new LoadIncrementalHFiles(hbaseConf)

        bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

        hbaseConn.close()
        fileSystem.close()
        sc.stop()
    }
}
3 pom

为防止大家也和我出现版本冲突 而导致not found class 的问题.我给大家贴出pom.xml .我有很多注释,也是防止不一致,你就单独使用依赖 .可能还需要建一个build目录.



    4.0.0

    com.test.task
    Hdfs2Hbase
    1.0-SNAPSHOT


    
        2.4.0
        2.11.12
        2.1.0
        3.0.0
    

    

        
            org.scala-lang
            scala-library
            ${scala.version}
        





        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
            
                
                    org.apache.hadoop
                    hadoop-client
                
                
                    org.apache.hadoop
                    hadoop-hdfs
                
            
        

        
            org.apache.hbase
            hbase-it
            ${hbase.version}
        






























        










    


    
        src/main/scala
        src/test/scala
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.5.1
                
                    1.8
                    1.8
                    UTF-8
                
            
            
                org.scala-tools
                maven-scala-plugin
                
                    
                        
                            compile
                            testCompile
                        
                    
                
                
                    ${scala.version}
                    
                        -target:jvm-1.8
                    
                    
                        -Xss2048K
                    
                

            


            
                org.apache.maven.plugins
                maven-dependency-plugin
                
                    
                        copy-dependencies
                        prepare-package
                        
                            copy-dependencies
                        
                        
                            ${project.build.directory}/lib
                            false
                            false
                            true
                        
                    
                
            

            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                    
                        
                            
                        
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-assembly-plugin
                
                    false
                    ${project.artifactId}-${ver}
                    
                        src/build/package.xml
                    
                
                
                    
                        single
                        package
                        
                            single
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-antrun-plugin
                1.8
                
                    
                        del-lib-jar
                        package
                        
                            run
                        
                        
                            
                                delete lib jar...
                                
                                
                                    
                                
                            
                        
                    
                
            
        
        
            
                src/main/resources
                
                    ***.sh
                
                false
            
            
                src/main/resources/env/${env}
            
            
                src/main/scala
                
                    **/*.*
                
                false
            
        
    
    
        
            
                org.scala-tools
                maven-scala-plugin
                
                    ${scala.version}
                
            
        
    
    
        
            prd
            
                true
            
            
                prd
                2.0.0-prd
            
        
        
            dev
            
                dev
                1.0.0-SNAPSHOT
            
        
        
            test
            
                test
                1.0.0-SNAPSHOT
            
        
        
            rc
            
                rc
                1.0.0-SNAPSHOT
            
        
    

    
        
            aliyun
            http://maven.aliyun.com/nexus/content/groups/public/
            
                true
            
            
                false
            
        
    


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758279.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号