栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spark 结合HBase

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spark 结合HBase

Spark 结合Hbase
最近使用到了spark处理数据落地到hbase,这里简单介绍一下具体的实现。包括单表和多表。
saveAsNewAPIHadoopFile
我们可以直接使用Hbase的java api,这里不做介绍。
我们先看看如何使用新版本Hadoop API saveAsNewAPIHadoopFile来进行将数据写入
Hbase。
//1-构建需要的rdd
val hbaseInfoRDD = logDF.rdd.map(x => {
      val col_01= x.getAs[String]("col_01")
      val col_02= x.getAs[String]("col_02")
      
      val columns = scala.collection.mutable.HashMap[String,String]()
      columns.put("col_01",col_01)
      columns.put("col_02",col_02)
      // Hbase API  Put

      val rowkey = getRowKey("自己实现")  // Hbase的rowkey
      val put = new Put(Bytes.toBytes(rowkey)) // 要保存到Hbase的Put对象
      // 每一个rowkey对应的cf中的所有column字段
      for((k,v) <- columns) {
 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(k.toString), Bytes.toBytes(v.toString));
      }
      
      (new ImmutableBytesWritable(rowkey.getBytes), put)
    })


//2-调用saveAsNewAPIHadoopFile,进行单表的写入
    val conf = new Configuration()
    conf.set("hbase.rootdir","hdfs://hadoop001:8020/hbase")
    conf.set("hbase.zookeeper.quorum","hadoop001:2181")
    // 设置写数据到哪个表中
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

hbaseInfoRDD.saveAsNewAPIHadoopFile(
      "null",
      classOf[ImmutableBytesWritable],
      classOf[Put],
      classOf[TableOutputFormat[ImmutableBytesWritable]],
      conf
    )
上面完成了单表的写入,但是当我们需要根据条件写入多张Hbase表,又应该怎么办呢?
//1-构建需要的rdd
val hbaseInfoRDD = logDF.rdd.map(x => {
      val col_01= x.getAs[String]("col_01")
      val day= x.getAs[String]("day")
      
      val columns = scala.collection.mutable.HashMap[String,String]()
      columns.put("col_01",col_01)
      columns.put("day",day)
      // Hbase API  Put

      val rowkey = getRowKey("自己实现")  // Hbase的rowkey
      val put = new Put(Bytes.toBytes(rowkey)) // 要保存到Hbase的Put对象
      // 每一个rowkey对应的cf中的所有column字段
      for((k,v) <- columns) {
 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(k.toString), Bytes.toBytes(v.toString));
      }
 
     //比如表名称按照day来,哪天的数据就入哪张表中
      val tableName=day
      
      (new ImmutableBytesWritable(Bytes.toBytes(tableName)), put)
    })

   //2-调用saveAsNewAPIHadoopFile,进行多表的写入
    val conf = new Configuration()
    conf.set("hbase.rootdir","hdfs://hadoop001:8020/hbase")
    conf.set("hbase.zookeeper.quorum","hadoop001:2181")
  
   // 保存数据
    hbaseInfoRDD.saveAsNewAPIHadoopFile(
      "null",
      classOf[ImmutableBytesWritable],
      classOf[Put],
      classOf[MultiTableOutputFormat],
      conf
    )

注意这种情况只能事先用于表存在的情况。
假如需要表不存在的情况下自动建表怎么办好呢?
这里仅仅提供一个思路。可以自己重写MultiTableOutputFormat里面的
getRecordWriter方法来实现。具体源码就需要自己去研究了。
//1-重写MultiTableOutputFormat
public class MyMultiTableOutputFormat extends MultiTableOutputFormat {

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {

    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
 return new TableOutputCommitter();
    }

    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

 Configuration conf = context.getConfiguration();
 return new MyMultiTableRecordWriter(HbaseConfiguration.create(conf),
  conf.getBoolean(WAL_PROPERTY, WAL_ON));
    }
}

//2-实现自己的RecordWriter,直接从MultiTableOutputFormat里面获取
//MultiTableRecordWriter的代码进行修改,只列出修改的地方
public class MyMultiTableRecordWriter extends RecordWriter {
    private static final Log LOG = LogFactory.getLog(MyMultiTableRecordWriter.class);
    Connection connection;
    Map mutatorMap = new HashMap<>();
    Configuration conf;
    boolean useWriteAheadLogging;

    //新增列族
    public static  final String FAMILY_NAME="family";

   
    public MyMultiTableRecordWriter(Configuration conf,
      boolean useWriteAheadLogging) {
 LOG.debug("Created new MultiTableRecordReader with WAL "
  + (useWriteAheadLogging ? "on" : "off"));
 this.conf = conf;
 this.useWriteAheadLogging = useWriteAheadLogging;
    }

  
    BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
 if(this.connection == null){
     this.connection = ConnectionFactory.createConnection(conf);
 }
 
 if (!mutatorMap.containsKey(tableName)) {
     LOG.debug("Opening HTable "" + Bytes.toString(tableName.get())+ "" for writing");

     TableName tableNameObj=TableName.valueOf(tableName.get());

     BufferedMutator mutator =
      connection.getBufferedMutator(tableNameObj);

     createIfNotExist(connection,tableNameObj);

     mutatorMap.put(tableName, mutator);
 }
 return mutatorMap.get(tableName);
    }


    private void createIfNotExist(Connection connection,TableName tableName){

 try {
     Admin admin=connection.getAdmin();
     if(!admin.tableExists(tableName)){
  HTableDescriptor desc=new HTableDescriptor(tableName);
  String familyName=this.conf.get(FAMILY_NAME);
  HColumnDescriptor hColumnDescriptor=new HColumnDescriptor(familyName);
  desc.addFamily(hColumnDescriptor);
  admin.createTable(desc);
     }

 }catch (Exception e){
     LOG.error("error:{}",e);
 }

    }

    ......省略.....
}

//3-使用
   // 设置列族
    conf.set(MyMultiTableRecordWriter.FAMILY_NAME,"indo_demo")

    //设置wal
    //conf.set(MultiTableOutputFormat.WAL_PROPERTY,"false");

    // 保存数据
    hbaseInfoRDD.saveAsNewAPIHadoopFile(
      "null",
      classOf[ImmutableBytesWritable],
      classOf[Put],
      classOf[MyMultiTableOutputFormat],
      conf
    )
    
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号