阅读代码基于hbase 1.2
这周大致看了一下客户端的时候,及一些内部代码的实现,做个笔记,怕以后不怎么看客户端代码给忘掉了。客户端代码中会创建一个connection对象,然后通过connection对象来获取一个表对象HTable,通过HTable对象来进行数据的读写,主要分析一下Hbable中的代码。
put接口可以一次put多条数据,也就是MultiPut功能,直接看这个函数的实现:
public void put(final Listputs) throws IOException { // 修改写入到buffer,如果buffer中数据过多,会自动提交 getBufferedMutator().mutate(puts); // 如果是自动提交,就提交 if (autoFlush) { flushCommits(); } // 上述两种提交方式最终走了相同的逻辑 }
嗯,看起来代码很朴素,直接把put的数据添加到一个buffer中,如果是自动提交就直接进行提交,在buffer中添加数据,如果数据过多也会进行提交。看懂了,但是没完全看懂,继续看getBufferedMutator().mutate的实现。
public void mutate(List extends Mutation> ms) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
}
// 计算写入数据的大小
long toAddSize = 0;
for (Mutation m : ms) {
if (m instanceof Put) {
validatePut((Put) m);
}
toAddSize += m.heapSize();
}
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
// 如果异步提交出错了,就同步提交
if (ap.hasError()) {
currentWriteBufferSize.addAndGet(toAddSize);
writeAsyncBuffer.addAll(ms);
backgroundFlushCommits(true);
} else {
// 否则将修改添加到buffer中
currentWriteBufferSize.addAndGet(toAddSize);
writeAsyncBuffer.addAll(ms);
}
// Now try and queue what needs to be queued.
// 如果size过大,则提交数据
while (currentWriteBufferSize.get() > writeBufferSize) {
backgroundFlushCommits(false);
}
}
mutate的核心功能就是计算一下buffe中的size大小,如果超过大小,那么就需要对数据进行提交。继续看backgroundFlushCommits函数的实现。
private void backgroundFlushCommits(boolean synchronous) throws
InterruptedIOException,
RetriesExhaustedWithDetailsException {
linkedList buffer = new linkedList<>();
// Keep track of the size so that this thread doesn't spin forever
long dequeuedSize = 0;
try {
// Grab all of the available mutations.
Mutation m;
// If there's no buffer size drain everything. If there is a buffersize drain up to twice
// that amount. This should keep the loop from continually spinning if there are threads
// that keep adding more data to the buffer.
// 将要提交的数据从writeAsyncBuffer取出,一直取到writeAsyncBuffer为空,或者提交的size大于二倍writeBufferSize
// 此处有疑惑,看起来会出现二倍writeBufferSize的情况是因为外界还在往writeAsyncBuffer写数据
// 但是HTable应该是个线程不安全的对象,只能在一个线程中使用,不应该出现这种并发问题?
while (
(writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous)
&& (m = writeAsyncBuffer.poll()) != null) {
buffer.add(m);
long size = m.heapSize();
dequeuedSize += size;
currentWriteBufferSize.addAndGet(-size);
}
if (!synchronous && dequeuedSize == 0) {
return;
}
// 异步提交直接将任务塞到线程池就行
if (!synchronous) {
ap.submit(tableName, buffer, true, null, false);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)");
}
}
// 同步提交需要等待任务执行完成
if (synchronous || ap.hasError()) {
while (!buffer.isEmpty()) {
ap.submit(tableName, buffer, true, null, false);
}
RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
if (error != null) {
if (listener == null) {
throw error;
} else {
this.listener.onException(error, this);
}
}
}
} finally {
// 如果有未提交成功的,数据重新放回writeAsyncBuffer中
for (Mutation mut : buffer) {
long size = mut.heapSize();
currentWriteBufferSize.addAndGet(size);
dequeuedSize -= size;
writeAsyncBuffer.add(mut);
}
}
}
整个函数的逻辑也非常简单,将异步writeAsyncBuffer里的数据都取出来,然后塞到线程池中去提交,如果有未提交完成的,重新放回writeAsyncBuffer中。下面继续看这个ap.submit的实现,跳过几层封装的函数,会一直进入到AsyncRequestFuture函数。
publicAsyncRequestFuture submit(ExecutorService pool, TableName tableName, List extends Row> rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { if (rows.isEmpty()) { return NO_REQS_RESULT; } Map > actionsByServer = new HashMap >(); List > retainedActions = new ArrayList >(rows.size()); NonceGenerator ng = this.connection.getNonceGenerator(); long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. // Location errors that happen before we decide what requests to take. List locationErrors = null; List locationErrorRows = null; do { // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); // Remember the previous decisions about regions or region servers we put in the // final multi. Map regionIncluded = new HashMap (); Map serverIncluded = new HashMap (); int posInList = -1; Iterator extends Row> it = rows.iterator(); // 上面定义变量,太长可以不看 while (it.hasNext()) { Row r = it.next(); HRegionLocation loc; try { if (r == null) { throw new IllegalArgumentException("#" + id + ", row cannot be null"); } // Make sure we get 0-s replica. // 对要发送的key进行路由定位,要发到哪个region,哪个server RegionLocations locs = connection.locateRegion( tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { throw new IOException("#" + id + ", no location found, aborting submit for" + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); } loc = locs.getDefaultRegionLocation(); } catch (IOException ex) { locationErrors = new ArrayList (); locationErrorRows = new ArrayList (); LOG.error("Failed to get region location ", ex); // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. retainedActions.add(new Action (r, ++posInList)); locationErrors.add(ex); locationErrorRows.add(posInList); it.remove(); break; // Backward compat: we stop considering actions on location error. } // 进行一些检查 // 如果对于对这个region将要写入的key过多,就拒绝这个key // 如果对于客户端的所有的并发任务量过大(正在进行的rpc过多),就拒绝这个key // 如果对于某个server的任务并发量(对该server正在进行的rpc过多),就拒绝这个key if (canTakeOperation(loc, regionIncluded, serverIncluded)) { Action
action = new Action
(r, ++posInList); setNonce(ng, r, action); // 记录下可以提交的key retainedActions.add(action); // TODO: replica-get is not supported on this path byte[] regionName = loc.getRegionInfo().getRegionName(); // 把要提交的key按照server分组,存放在actionsByServer中 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); it.remove(); } } } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); if (retainedActions.isEmpty()) return NO_REQS_RESULT; return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, locationErrors, locationErrorRows, actionsByServer, pool); }
这个函数蛮复杂的,定义了一大把变量,核心功能是为提交的key寻址(找到region和server),并判断该key是否可以提交,最终将要提交的key按照server整理好。寻址的功能由connection.locateRegion实现,这个我们后面单独拎出来分析;判断key是否能提交由canTakeOperation实现,具体逻辑看代码中的;将key按照server分组的功能由addAction实现。再继续往下,调用了submitMultiActions函数
private void sendMultiAction(Map> actionsByServer, int numAttempt, List > actionsForReplicaThread, boolean reuseThread) { // Run the last item on the same thread if we are already on a send thread. // We hope most of the time it will be the only item, so we can cut down on threads. int actionsRemaining = actionsByServer.size(); // This iteration is by server (the HRegionLocation comparator is by server portion only). // 以server为单位进行提交 for (Map.Entry > e : actionsByServer.entrySet()) { ServerName server = e.getKey(); MultiAction multiAction = e.getValue(); incTaskCounters(multiAction.getRegions(), server); Collection extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction, numAttempt); // make sure we correctly count the number of runnables before we try to reuse the send // thread, in case we had to split the request into different runnables because of backoff if (runnables.size() > actionsRemaining) { actionsRemaining = runnables.size(); } // run all the runnables for (Runnable runnable : runnables) { if ((--actionsRemaining == 0) && reuseThread) { runnable.run(); } else { try { pool.submit(runnable); } catch (Throwable t) { ... } } } }
这个函数逻辑也挺简单的,按照server为单位,创建runnables,然后把runnable添加到pool中去执行就行,最后一个任务复用本线程提交,核心的提交逻辑都在runnable中,由getNewMultiActionRunnable生成
private Collection extends Runnable> getNewMultiActionRunnable(ServerName server,
MultiAction multiAction,
int numAttempt) {
...
// group the actions by the amount of delay
Map actions = new HashMap(multiAction
.size());
// split up the actions
for (Map.Entry>> e : multiAction.actions.entrySet()) {
Long backoff = getBackoff(server, e.getKey());
// 获取退避时间,根据退避时间分组要提交的key,相同退避时间的分到相同的runner中
// 默认策略退避时间为0,即不等待,立刻提交
// 退避时间是根据region状态算出来,具体策略可以看代码
DelayingRunner runner = actions.get(backoff);
if (runner == null) {
actions.put(backoff, new DelayingRunner(backoff, e));
} else {
runner.add(e);
}
}
//
List toReturn = new ArrayList(actions.size());
for (DelayingRunner runner : actions.values()) {
String traceText = "AsyncProcess.sendMultiAction";
// 生成真正执行rpc的SingleServerRequestRunnable
Runnable runnable =
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
callsInProgress);
// use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) {
...
} else {
if (connection.getConnectionMetrics() != null) {
connection.getConnectionMetrics().incrNormalRunners();
}
}
runnable = Trace.wrap(traceText, runnable);
toReturn.add(runnable);
}
return toReturn;
}
这个函数的核心功能是为提交的key计算退避时间,如果region负载比较高,这个key的提交时间会被推迟,相同退避时间的key会被分配到一个runner中,默认退避时间是0,所以最终只有一个runner,DelayingRunner是一个封装了等待时间的提交任务,最终执行提交的其实是SingleServerRequestRunnable,卧槽,终于分析到最后一层了,下面看代码
public void run() {
MultiResponse res;
MultiServerCallable callable = null;
try {
callable = createCallable(server, tableName, multiAction);
try {
RpcRetryingCaller caller = createCaller(callable);
if (callsInProgress != null) callsInProgress.add(callable);
res = caller.callWithoutRetries(callable, timeout);
if (res == null) {
// Cancelled
return;
}
} catch (IOException e) {
...
} catch (Throwable t) {
...
}
// Normal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(multiAction, server, res, numAttempt);
} catch (Throwable t) {
...
} finally {
decTaskCounters(multiAction.getRegions(), server);
if (callsInProgress != null && callable != null) {
callsInProgress.remove(callable);
}
}
}
}
在SingleServerRequestRunnable的run函数中最终调用了rpc,rpc内部的逻辑先不分析了,后面如果看这个模块的话单独拿出来说。
总结:如果没开启自动提交,那么客户端会缓存一部分数据,超过一定阈值后才会提交给server,能提高吞吐量,但是有丢数据的风险。最终的提交逻辑非常简单,给所有提交的key按照server分类,同一个server的会一组提交,如果没有退避时间(一般都没有),那么一个server下的所有region的key提交使用一个rpc。如果是同步提交,那么会等待提交完成之后才返回,异步提交,将提交任务添加之后就返回了。
Get Delete两个请求的逻辑几乎是一模一样的,不论是Get还是Delete都继承自Row接口。
如果是单key的get/delete会直接创建rpc调用,然后等待返回,逻辑非常简单。
多key的其实和上述的PUT的逻辑差不多,最终也会走到sendMultiAction函数的逻辑中,只是get和delete都是同步调用,必须等待rpc返回之后,get才能返回(这个很好理解)
Scan的逻辑比较复杂,也是比较容易出问题的模块,可配置的参数也较多,后面看了之后单独来分析。



