2021SC@SDUSC 2021SC@SDUSC
2021SC@SDUSC 2021SC@SDUSC
- 2021SC@SDUSC hbase源码分析(十四)BulkLoad功能
- BulkLoad功能
- BulkLoad核心流程
- HFile生成阶段。
- HFile导入阶段。
- 核心流程到此结束
在实际生产环境中,有这样一种场景:用户数据位于HDFS中,业务需要定期将这部分海量数据导入Hbase系统,以执行随机查询更新操作。这种场景如果调用写入API进行处理,极有可能会给RegionServer带来较大的写入压力:
-
引起RegionServer频繁flush,进而不断compact、split,影响集群稳定性。
-
引起RegionServer频繁GC,影响集群稳定性。
-
消耗大量CPU资源、带宽资源、内存资源以及IO资源,与其他业务产生资源竞争。
-
在某些场景下,比如平均KV大小比较大的场景,会耗尽RegionServer的处理线程,导致集群阻塞。
鉴于存在上述问题,Hbase提供了另一种将数据写入Hbase集群的方法——BulkLoad。BulkLoad首先使用MapReduce将待写入集群数据转换为HFile文件,再直接将这些HFile文件加载到在线集群中。显然,BulkLoad方案没有将写请求发送给RegionServer处理,可以有效避免上述一系列问题。
BulkLoad核心流程从Hbase的视角来看,BulkLoad主要由两个阶段组成:
HFile生成阶段。这个阶段会运行一个MapReduce任务,MapReduce的mapper需要自己实现,将HDFS文件中的数据读出来组装成一个复合KV,其中Key是rowkey,Value可以是KeyValue对象、Put对象甚至Delete对象;MapReduce的reducer由Hbase负责,通过方法HFileOutputFormat2.configureIncrementalLoad()进行配置,这个方法主要负责以下事项。
-
根据表信息配置一个全局有序的partitioner。
-
将partitioner文件上传到HDFS集群并写入DistributedCache。
-
设置reduce task的个数为目标表Region的个数。
-
设置输出key/value类满足HFileOutputFormat所规定的格式要求。
-
根据类型设置reducer执行相应的排序(KeyValueSortReducer或者PutSortReducer)。
这个阶段会为每个Region生成一个对应的HFile文件。
Partitioner相关的HRegionPartitioner类,他将rowKey和KeyValue组合起来:
public class HRegionPartitionerimplements Partitioner { private static final Logger LOG = LoggerFactory.getLogger(HRegionPartitioner.class); // Connection and locator are not cleaned up; they just die when partitioner is done. private Connection connection; private RegionLocator locator; private byte[][] startKeys; @Override public void configure(JobConf job) { try { this.connection = ConnectionFactory.createConnection(HbaseConfiguration.create(job)); TableName tableName = TableName.valueOf(job.get(TableOutputFormat.OUTPUT_TABLE)); this.locator = this.connection.getRegionLocator(tableName); } catch (IOException e) { LOG.error(e.toString(), e); } try { this.startKeys = this.locator.getStartKeys(); } catch (IOException e) { LOG.error(e.toString(), e); } } @Override public int getPartition(ImmutableBytesWritable key, V2 value, int numPartitions) { byte[] region = null; // only one region return 0 if (this.startKeys.length == 1){ return 0; } try { // Not sure if this is cached after a split so we could have problems // here if a region splits while mapping region = locator.getRegionLocation(key.get()).getRegionInfo().getStartKey(); } catch (IOException e) { LOG.error(e.toString(), e); } for (int i = 0; i < this.startKeys.length; i++){ if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ if (i >= numPartitions){ // cover if we have less reduces then regions. return (Integer.toString(i).hashCode() & Integer.MAX_VALUE) % numPartitions; } return i; } } // if above fails to find start key that match we need to return something return 0; } }
被KeyValueSortReducer或者PutSortReducer继承的类Reducer:
public class ReducerHFile导入阶段。{ public Reducer() { } protected void setup(Reducer .Context context) throws IOException, InterruptedException { } protected void reduce(KEYIN key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { Iterator i$ = values.iterator(); while(i$.hasNext()) { VALUEIN value = i$.next(); context.write(key, value); } } protected void cleanup(Reducer .Context context) throws IOException, InterruptedException { } public void run(Reducer .Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } } public abstract class Context implements ReduceContext { } }
HFile准备就绪之后,就可以使用工具completebulkload将HFile加载到在线Hbase集群。completebulkload工具主要负责以下工作。
-
依次检查第一步生成的所有HFile文件,将每个文件映射到对应的Region。
-
将HFile文件移动到对应Region所在的HDFS文件目录下。
-
告知Region对应的RegionServer,加载HFile文件对外提供服务。
如果在BulkLoad的中间过程中Region发生了分裂,completebulkload工具会自动将对应的HFile文件按照新生成的Region边界切分成多个HFile文件,保证每个HFile都能与目标表当前的Region相对应。但这个过程需要读取HFile内容,因而并不高效。需要尽量减少HFile生成阶段和HFile导入阶段的延迟,最好能够在HFile生成之后立刻执行HFile导入。
包含completebulkload的BulkLoadHFiles工具类:
public class BulkLoadHFilesTool extends LoadIncrementalHFiles implements BulkLoadHFiles {
public static final String NAME = "completebulkload";
public BulkLoadHFilesTool(Configuration conf) {
super(conf);
}
private Map convert(
Map map) {
return map.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
@Override
public Map bulkLoad(TableName tableName,
Map> family2Files) throws TableNotFoundException, IOException {
return convert(run(family2Files, tableName));
}
@Override
public Map bulkLoad(TableName tableName, Path dir)
throws TableNotFoundException, IOException {
return convert(run(dir, tableName));
}
public static void main(String[] args) throws Exception {
Configuration conf = HbaseConfiguration.create();
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
System.exit(ret);
}
}
核心流程到此结束


