栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC hbase代码分析(十四)BulkLoad功能

2021SC@SDUSC hbase代码分析(十四)BulkLoad功能

2021SC@SDUSC hbase源码分析(十四)BulkLoad功能

2021SC@SDUSC 2021SC@SDUSC
2021SC@SDUSC 2021SC@SDUSC

目录
  • 2021SC@SDUSC hbase源码分析(十四)BulkLoad功能
    • BulkLoad功能
    • BulkLoad核心流程
      • HFile生成阶段。
      • HFile导入阶段。
      • 核心流程到此结束

BulkLoad功能

在实际生产环境中,有这样一种场景:用户数据位于HDFS中,业务需要定期将这部分海量数据导入Hbase系统,以执行随机查询更新操作。这种场景如果调用写入API进行处理,极有可能会给RegionServer带来较大的写入压力:

  1. 引起RegionServer频繁flush,进而不断compact、split,影响集群稳定性。

  2. 引起RegionServer频繁GC,影响集群稳定性。

  3. 消耗大量CPU资源、带宽资源、内存资源以及IO资源,与其他业务产生资源竞争。

  4. 在某些场景下,比如平均KV大小比较大的场景,会耗尽RegionServer的处理线程,导致集群阻塞。

鉴于存在上述问题,Hbase提供了另一种将数据写入Hbase集群的方法——BulkLoad。BulkLoad首先使用MapReduce将待写入集群数据转换为HFile文件,再直接将这些HFile文件加载到在线集群中。显然,BulkLoad方案没有将写请求发送给RegionServer处理,可以有效避免上述一系列问题。

BulkLoad核心流程

从Hbase的视角来看,BulkLoad主要由两个阶段组成:

HFile生成阶段。

这个阶段会运行一个MapReduce任务,MapReduce的mapper需要自己实现,将HDFS文件中的数据读出来组装成一个复合KV,其中Key是rowkey,Value可以是KeyValue对象、Put对象甚至Delete对象;MapReduce的reducer由Hbase负责,通过方法HFileOutputFormat2.configureIncrementalLoad()进行配置,这个方法主要负责以下事项。

  1. 根据表信息配置一个全局有序的partitioner。

  2. 将partitioner文件上传到HDFS集群并写入DistributedCache。

  3. 设置reduce task的个数为目标表Region的个数。

  4. 设置输出key/value类满足HFileOutputFormat所规定的格式要求。

  5. 根据类型设置reducer执行相应的排序(KeyValueSortReducer或者PutSortReducer)。

这个阶段会为每个Region生成一个对应的HFile文件。

Partitioner相关的HRegionPartitioner类,他将rowKey和KeyValue组合起来:

public class HRegionPartitioner
implements Partitioner {
  private static final Logger LOG = LoggerFactory.getLogger(HRegionPartitioner.class);
  // Connection and locator are not cleaned up; they just die when partitioner is done.
  private Connection connection;
  private RegionLocator locator;
  private byte[][] startKeys;

  @Override
  public void configure(JobConf job) {
    try {
      this.connection = ConnectionFactory.createConnection(HbaseConfiguration.create(job));
      TableName tableName = TableName.valueOf(job.get(TableOutputFormat.OUTPUT_TABLE));
      this.locator = this.connection.getRegionLocator(tableName);
    } catch (IOException e) {
      LOG.error(e.toString(), e);
    }

    try {
      this.startKeys = this.locator.getStartKeys();
    } catch (IOException e) {
      LOG.error(e.toString(), e);
    }
  }

  @Override
  public int getPartition(ImmutableBytesWritable key, V2 value, int numPartitions) {
    byte[] region = null;
    // only one region return 0
    if (this.startKeys.length == 1){
      return 0;
    }
    try {
      // Not sure if this is cached after a split so we could have problems
      // here if a region splits while mapping
      region = locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
    } catch (IOException e) {
      LOG.error(e.toString(), e);
    }
    for (int i = 0; i < this.startKeys.length; i++){
      if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
        if (i >= numPartitions){
          // cover if we have less reduces then regions.
          return (Integer.toString(i).hashCode()
              & Integer.MAX_VALUE) % numPartitions;
        }
        return i;
      }
    }
    // if above fails to find start key that match we need to return something
    return 0;
  }
}

被KeyValueSortReducer或者PutSortReducer继承的类Reducer:

public class Reducer {
    public Reducer() {
    }

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
    }

    protected void reduce(KEYIN key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
        Iterator i$ = values.iterator();

        while(i$.hasNext()) {
            VALUEIN value = i$.next();
            context.write(key, value);
        }

    }
    protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
    }
    public void run(Reducer.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }
    public abstract class Context implements ReduceContext {
    }
}
HFile导入阶段。

HFile准备就绪之后,就可以使用工具completebulkload将HFile加载到在线Hbase集群。completebulkload工具主要负责以下工作。

  1. 依次检查第一步生成的所有HFile文件,将每个文件映射到对应的Region。

  2. 将HFile文件移动到对应Region所在的HDFS文件目录下。

  3. 告知Region对应的RegionServer,加载HFile文件对外提供服务。

如果在BulkLoad的中间过程中Region发生了分裂,completebulkload工具会自动将对应的HFile文件按照新生成的Region边界切分成多个HFile文件,保证每个HFile都能与目标表当前的Region相对应。但这个过程需要读取HFile内容,因而并不高效。需要尽量减少HFile生成阶段和HFile导入阶段的延迟,最好能够在HFile生成之后立刻执行HFile导入。

包含completebulkload的BulkLoadHFiles工具类:

public class BulkLoadHFilesTool extends LoadIncrementalHFiles implements BulkLoadHFiles {

  public static final String NAME = "completebulkload";

  public BulkLoadHFilesTool(Configuration conf) {
    super(conf);
  }

  private Map convert(
      Map map) {
    return map.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
  }

  @Override
  public Map bulkLoad(TableName tableName,
      Map> family2Files) throws TableNotFoundException, IOException {
    return convert(run(family2Files, tableName));
  }

  @Override
  public Map bulkLoad(TableName tableName, Path dir)
      throws TableNotFoundException, IOException {
    return convert(run(dir, tableName));
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = HbaseConfiguration.create();
    int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
    System.exit(ret);
  }

}
核心流程到此结束
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/682755.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号