2021SC@SDUSC 2021SC@SDUSC 2021SC@SDUSC 2021SC@SDUSC
- 2021SC@SDUSC hbase源码分析(二)写入数据(1)
- Hbase写入总览
- 大体写入流程
- 写入时组件交互
- 客户端处理阶段
- 1.用户提交put请求
- 单次put请求
- 多次put请求
- 如有不足或错误,欢迎指正
Hbase采用LSM树架构,这种架构更适合于写多读少的场景。
需要说明的是,Hbase服务端并没有提供update、delete接口,Hbase中对数据的更新、删除操作在服务器端也被认为为写入操作,其中更新操作会写入一个最新版本数据,删除操作会写入一条标记为deleted的KV数据。
综上,Hbase中的更新、删除操作的流程与写入流程完全一致。
虽然Hbase在不断更新,但其中的逻辑差别不大。
大体写入流程从整体架构的视角来看,写入流程可以概括为三个阶段:
- 客户端处理阶段
- Region写入阶段
- MemStore Flush阶段
本篇博客首先对客户端处理阶段进行源码分析。
写入时组件交互 客户端处理阶段在 Hbase中,大部分的操作都是在RegionServer完成的,Client端想要插入、删除、查询数据都需要先找到相应的 RegionServer。
hbase客户端处理写入请求的核心流程可以分为三步:
1.用户提交put请求用户提交put请求后,Hbase客户端会将写入的数据添加到本地缓冲区中,符合一定条件就会通过AsyncProcess异步批量提交。
get、put、delete等客户可用的函数都在src/main/java/org/apache/hadoop/hbase/client目录下的HTable.java文件中,其中有两个变量:
//AsuncProcess 对象在后续batch方法中调用了其中的submit方法进行了提交 @VisibleForTesting AsyncProcess multiAp; private final RpcRetryingCallerFactory rpcCallerFactory; private final RpcControllerFactory rpcControllerFactory;
分别定义了rpc调用的工厂和一个异步处理的进程。
客户端的put方法
@Override
public void put(final Put put) throws IOException {
//校验put不能超过10M
validatePut(put);
//获取一个callable对象,并且实现了一个匿名对象
ClientServiceCallable callable =
new ClientServiceCallable(this.connection, getName(), put.getRow(),
this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Void rpcCall() throws Exception {
MutateRequest request =
RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put);
doMutate(request);
return null;
}
};
//最后调用了一个callWithRetries 一个可重试的调用
rpcCallerFactory. newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
this.operationTimeoutMs);
}
@Override public void put(final Listputs) throws IOException { for (Put put : puts) { validatePut(put); } Object[] results = new Object[puts.size()]; try { batch(puts, results, writeRpcTimeoutMs); } catch (InterruptedException e) { throw (InterruptedIOException) new InterruptedIOException().initCause(e); } }
从上面代码可以看出:Hbase客户端在进行写入初期,既可以一次put一行记录,也可以一次put多行记录。
两个方法内部都会调用validatePut方法,其主要作用是来验证Put是否有效,主要是判断kv的长度。我们追踪validatePut到其本类。代码如下:
// validate for well-formedness
static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
if (maxKeyValueSize > 0) {
for (List| list : put.getFamilyCellMap().values()) {
for (Cell cell : list) {
if (cell.getSerializedSize() > maxKeyValueSize) {
throw new IllegalArgumentException("KeyValue size too large");
}
}
}
}
}
|
可见其功能是验证put对象是否合法。
单次put请求首先我们先看单次put方法中的代码:
我们可以发现它总共做了三件事:
- 校验put不能超过10M
- 获取一个callable对象,并且实现了一个匿名对象
- 最后调用了一个callWithRetries 一个可重试的调用
接下来,我们去看callWithRetries这个方法:
@Override public T callWithRetries(RetryingCallablecallable, int callTimeout) throws IOException, RuntimeException { List exceptions = new ArrayList<>(); tracker.start(); context.clear(); for (int tries = 0;; tries++) { long expectedSleep; try { //prepare很明显就是该方法的核心内容 callable.prepare(tries != 0); interceptor.intercept(context.prepare(callable, tries)); return callable.call(getTimeout(callTimeout)); } ... } }
然后我们发现prepare这个方法,进入查看:
public void prepare(final boolean reload) throws IOException {
if (getRpcController().isCanceled()) return;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (reload || location == null) {
//后续获取相关locations的方法
RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
}
if (location == null || location.getServerName() == null) {
throw new HbaseIOException("There is no location for replica id #" + id);
}
setStubByServiceName(this.location.getServerName());
}
根据其中的判断语句,我们发现其中的getRegionLocations方法,我们追根溯源,找到其核心代码:
static RegionLocations getRegionLocations(boolean useCache, int replicaId,
ClusterConnection cConnection, TableName tableName, byte[] row)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
if (useCache) {
rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
} else {
rl = cConnection.relocateRegion(tableName, row, replicaId);
}
}
...
return rl;
}
其中最主要的是try-catch语句中的这个if-else语句,我们直接进入if成立的语句中的方法进行查看(其中else语句最终也会回到if中的locateRegion方法,毫无疑问,locateRegion是很重要方法):
public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
boolean retry, int replicaId) throws IOException {
checkClosed();
if (tableName == null || tableName.getName().length == 0) {
throw new IllegalArgumentException("table name cannot be null or zero length");
}
if (tableName.equals(TableName.meta_TABLE_NAME)) {
return locatemeta(tableName, useCache, replicaId);
} else {
// Region not in the cache - have to go to the meta RS
return locateRegionInmeta(tableName, row, useCache, retry, replicaId);
}
}
根据locateRegion方法的返回值,我们可以发现:上来就去缓存中去拿元数据,但是我们缓冲区并没有数据,这个时候,他会进行重试,这个时候 relocatemeta 就为true了
进入locatemeta方法后,我们最终发现这个时候就把我们的表元数据进行缓存了
private RegionLocations locatemeta(final TableName tableName,
boolean useCache, int replicaId) throws IOException {
// Hbase-10785: We cache the location of the meta itself, so that we are not overloading
// zookeeper with one request for every region lookup. We cache the meta with empty row
// key in metaCache.
byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
RegionLocations locations = null;
if (useCache) {
locations = getCachedLocation(tableName, metaCacheKey);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
}
然后把locations返回了。
流程:
- 获取meta表
- 根据meta表获取对应的偏移量所在的region
- 然后根据region拿到servername去访问
这样就可以写数据了
多次put请求多次put请求及后续的RS寻址、构造RPC、Region写入等阶段放在下一篇博客来分析。
如有不足或错误,欢迎指正


