- 前言
- 总结
前言
Hbase是一个基于RowKey进行检索的分布式数据库。它按照行的方向将表中的数据切分成一个个Region,而每个Region都会存在一个起始行StartKey和一个终止行EndKey。Region会最终选择一个RegionSever上线,并依靠RegionSever对外提供数据存取服务。那么,Hbase是如何实现数据的检索,也就是它如何将需要读写的行Row准确的定位到其所在Region和RegionServer上的呢?此处,我们就将研究下HRegion的定位。
之前我们已经研究过Hbase读取数据的应用–Scan,在Scan的过程中,它每次通过RPC与服务端通信,都是针对特定的Region及其所在RegionServer进行数据读取请求,将数据缓存至客户端。在它迭代获取数据的Scanner的next()中,会检查缓存中是否存在数据,若无,则加载缓存,然后直接从缓存中拉取数据,代码如下:
@Override
public Result next() throws IOException {
try {
lock.lock();
while (cache.isEmpty()) {
handleException();
if (this.closed) {
return null;
}
try {
notEmpty.await();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when wait to load cache");
}
}
Result result = pollCache();
if (prefetchCondition()) {
notFull.signalAll();
}
return result;
} finally {
lock.unlock();
handleException();
}
}
而这个加载缓存的loadCache()方法,则会调用call()方法,发送RPC请求给对应的RegionServer上的Region,那么它是如何定位Region的呢?我们先看下这个call()方法,代码如下:
Result[] call(Scan scan, ScannerCallableWithReplicas callable,
RpcRetryingCaller caller, int scannerTimeout)
throws IOException, RuntimeException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
// caller为RpcRetryingCaller类型
// callable为ScannerCallableWithReplicas类型
return caller.callWithoutRetries(callable, scannerTimeout);
}
实际上caller为RpcRetryingCaller类型,而callable为ScannerCallableWithReplicas类型,我们看下RpcRetryingCaller的callWithoutRetries()方法,关键代码如下:
// 先调用prepare()方法,再调用call()方法,超时时间为callTimeout callable.prepare(false); return callable.call(callTimeout);
实际上最终调用的是callable的call()方法,也就是ScannerCallableWithReplicas的call()方法,我们跟进下关键代码:
代码如下(示例):
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
Region的定位是通过调用RpcRetryingCallerWithReadReplicas的getRegionLocations()方法进行的,它需要是否使用缓存标识位useCache、主从复制replicaId、ClusterConnection集群连接器cConnection,表名tableName、所在行Row等关键参数,并返回RegionLocations,用于表示Region的位置信息。而RegionLocations中存在一个数组locations,它的定义如下:
// locations array contains the HRL objects for known region replicas indexes by the replicaId. // elements can be null if the region replica is not known at all. A null value indicates // that there is a region replica with the index as replicaId, but the location is not known // in the cache. private final HRegionLocation[] locations; // replicaId -> HRegionLocation.
它是一个HRegionLocation类型的数组,实际上存储的是replicaId到HRegionLocation的映射,replicaId就是数组的下标。而上面调用getRegionLocations()方法时,传入的replicaId为RegionReplicaUtil.DEFAULT_REPLICA_ID,也就是0。那么HRegionLocation是什么呢?看下它的两个关键成员变量就知道了:
private final HRegionInfo regionInfo; private final ServerName serverName;
从RpcRetryingCallerWithReadReplicas的getRegionLocations()方法开始,代码如下:
static RegionLocations getRegionLocations(boolean useCache, int replicaId,
ClusterConnection cConnection, TableName tableName, byte[] row)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
if (useCache) {
rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
} else {
rl = cConnection.relocateRegion(tableName, row, replicaId);
}
} catch (DonotRetryIOException | InterruptedIOException | RetriesExhaustedException e) {
throw e;
} catch (IOException e) {
throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId
+ " of region for " + Bytes.toStringBinary(row) + " in " + tableName, e);
}
if (rl == null) {
throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId
+ " of region for " + Bytes.toStringBinary(row) + " in " + tableName);
}
return rl;
}
}
其实逻辑很简单,就分两种情况,使用缓存和不使用缓存。而且,我们也应该能猜出来,即便是使用缓存,如果缓存中没有的话,它还是会走一遍不使用缓存的流程,将获取到的Region位置信息加载到缓存中,然后再返回给外部调用者,最终我们需要共同研究的仅仅是不使用缓存的情况下如何定位Region而已。
首先,我们来看下不使用缓存的情况下,是如何进行Region定位的。它调用的是ClusterConnection的relocateRegion()方法,而这个ClusterConnection是一个接口,它的实例化,是在HTable中进行,然后一层层传递过来的。我们先看下它的实例化,在HTable的构造方法中,代码如下:
this.connection = ConnectionManager.getConnectionInternal(conf);
通过ConnectionManager的静态方法getConnectionInternal(),从配置信息conf中加载而来。继续看下它的代码:
static ClusterConnection getConnectionInternal(final Configuration conf)
throws IOException {
// 根据配置信息conf构造HConnectionKey
HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (CONNECTION_INSTANCES) {
// 先从CONNECTION_INSTANCES中根据HConnectionKey获取连接HConnectionImplementation类型的connection,
// CONNECTION_INSTANCES为HConnectionKey到HConnectionImplementation的映射集合
HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
if (connection == null) {// 如果CONNECTION_INSTANCES中不存在
// 调用createConnection()方法创建一个HConnectionImplementation
connection = (HConnectionImplementation)createConnection(conf, true);
// 将新创建的HConnectionImplementation与HConnectionKey的对应关系存入CONNECTION_INSTANCES
CONNECTION_INSTANCES.put(connectionKey, connection);
} else if (connection.isClosed()) {// 如果CONNECTION_INSTANCES中存在,且已关闭的话
// 调用ConnectionManager的deleteConnection()方法,删除connectionKey对应的记录:
// 1、调用decCount()方法减少计数;
// 2、从CONNECTION_INSTANCES类表中移除connectionKey对应记录;
// 3、调用HConnectionImplementation的internalClose()方法处理关闭连接事宜
ConnectionManager.deleteConnection(connectionKey, true);
// 调用createConnection()方法创建一个HConnectionImplementation
connection = (HConnectionImplementation)createConnection(conf, true);
// 将新创建的HConnectionImplementation与HConnectionKey的对应关系存入CONNECTION_INSTANCES
CONNECTION_INSTANCES.put(connectionKey, connection);
}
// 连接计数器增1
connection.incCount();
// 返回连接
return connection;
}
}
这个HConnectionKey实际上是连接的一个Key类,包含了连接对应的hbase.zookeeper.quorum、hbase.zookeeper.property.clientPort等重要信息,而获取连接的方法也很简单,如果之前创建过key相同的连接,直接从CONNECTION_INSTANCES集合中根据HConnectionKey获取,并将连接计数器增1,直接返回连接,获取不到的话,根据HConnectionKey创建一个新的,并加入CONNECTION_INSTANCES集合,而且,如果获取到的连接是Closed的话,调用ConnectionManager的deleteConnection()方法,删除connectionKey对应的记录,创建一个新的连接创建一个HConnectionImplementation,并加入到CONNECTION_INSTANCES集合。
述ClusterConnection的实现类就是HConnectionImplementation,那么我们回到正轨上,继续研究Region的定位,先看下不使用缓存的情况的情况下是如何处理的。好,我们进入HConnectionImplementation的relocateRegion()方法,代码如下:
@Override
public RegionLocations relocateRegion(final TableName tableName,
final byte [] row, int replicaId) throws IOException{
// Since this is an explicit request not to use any caching, finding
// disabled tables should not be desirable. This will ensure that an exception is thrown when
// the first time a disabled table is interacted with.
if (!tableName.equals(TableName.meta_TABLE_NAME) && isTableDisabled(tableName)) {
throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
}
return locateRegion(tableName, row, false, true, replicaId);
}
先做一个必要的检查,如果表不是meta表,并且表被禁用的话,直接抛出TableNotEnabledException,然后调用locateRegion()方法进行Region的定位,传入的useCache为false,retry为true,即不使用缓存,并且进行重试。
在分析locateRegion()方法前,我们先折回去看看使用缓存情况的处理,它也是调用的locateRegion()方法,只不过传入的使用缓存标志位useCache为true这一个区别而已,好了,殊途同归,这里我们就只研究locateRegion()方法就行了,代码如下:
@Override
public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
boolean retry) throws IOException {
return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
@Override
public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
boolean retry, int replicaId) throws IOException {
checkClosed();
if (tableName == null || tableName.getName().length == 0) {
throw new IllegalArgumentException("table name cannot be null or zero length");
}
if (tableName.equals(TableName.meta_TABLE_NAME)) {
return locatemeta(tableName, useCache, replicaId);
} else {
// Region not in the cache - have to go to the meta RS
return locateRegionInmeta(tableName, row, useCache, retry, replicaId);
}
}
locateRegion()方法上来先做一些必要的检查:
1、判断连接是否已关闭的标志位closed,为true则直接抛出IOException异常;
2、判断表名tableName,表名为空的话直接抛出IllegalArgumentException异常。
然后,根据表是否为meta表,做以下处理:
1、如果是meta表,直接调用locatemeta()方法进行定位;
2、如果不是meta表,cache中没有,需要访问meta RS,调用locateRegionInmeta()方法进行定位;
我们今天先看非meta表,进入locateRegionInmeta()方法,代码如下:
private RegionLocations locateRegionInmeta(TableName tableName, byte[] row, boolean useCache,
boolean retry, int replicaId) throws IOException {
// If we are supposed to be using the cache, look in the cache to see if we already have the
// region.
if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
}
// build the key of the meta region we should be looking for.
// the extra 9’s on the end are necessary to allow “exact” matches
// without knowing the precise region names.
byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
byte[] metaStopKey =
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, “”, false);
Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
.setReadType(ReadType.PREAD);
switch (this.metaReplicaMode) {
case LOAD_BALANCE:
int metaReplicaId = this.metaReplicaSelector.select(tableName, row,
RegionLocateType.CURRENT);
if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
// If the selector gives a non-primary meta replica region, then go with it.
// Otherwise, just go to primary in non-hedgedRead mode.
s.setConsistency(Consistency.TIMELINE);
s.setReplicaId(metaReplicaId);
}
break;
case HEDGED_READ:
s.setConsistency(Consistency.TIMELINE);
break;
default:
// do nothing
}
int maxAttempts = (retry ? numTries : 1);
boolean relocatemeta = false;
for (int tries = 0; ; tries++) {
if (tries >= maxAttempts) {
throw new NoServerForRegionException("Unable to find region for "
+ Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
}
if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
} else {
// If we are not supposed to be using the cache, delete any existing cached location
// so it won't interfere.
// We are only supposed to clean the cache for the specific replicaId
metaCache.clearCache(tableName, row, replicaId);
}
// Query the meta region
long pausebase = this.pause;
takeUserRegionLock();
try {
// We don't need to check if useCache is enabled or not. Even if useCache is false
// we already cleared the cache for this row before acquiring userRegion lock so if this
// row is present in cache that means some other thread has populated it while we were
// waiting to acquire user region lock.
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
if (relocatemeta) {
relocateRegion(TableName.meta_TABLE_NAME, HConstants.EMPTY_START_ROW,
RegionInfo.DEFAULT_REPLICA_ID);
}
s.resetMvccReadPoint();
try (ReversedClientScanner rcs =
new ReversedClientScanner(conf, s, TableName.meta_TABLE_NAME, this, rpcCallerFactory,
rpcControllerFactory, getmetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
boolean tableNotFound = true;
for (;;) {
Result regionInfoRow = rcs.next();
if (regionInfoRow == null) {
if (tableNotFound) {
throw new TableNotFoundException(tableName);
} else {
throw new IOException(
"Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
}
}
tableNotFound = false;
// convert the row result into the HRegionLocation we need!
locations = metaTableAccessor.getRegionLocations(regionInfoRow);
if (locations == null || locations.getRegionLocation(replicaId) == null) {
throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
}
RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
if (regionInfo == null) {
throw new IOException("RegionInfo null or empty in " + TableName.meta_TABLE_NAME +
", row=" + regionInfoRow);
}
// See Hbase-20182. It is possible that we locate to a split parent even after the
// children are online, so here we need to skip this region and go to the next one.
if (regionInfo.isSplitParent()) {
continue;
}
if (regionInfo.isOffline()) {
throw new RegionOfflineException("Region offline; disable table call? " +
regionInfo.getRegionNameAsString());
}
// It is possible that the split children have not been online yet and we have skipped
// the parent in the above condition, so we may have already reached a region which does
// not contains us.
if (!regionInfo.containsRow(row)) {
throw new IOException(
"Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
}
ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
if (serverName == null) {
throw new NoServerForRegionException("No server address listed in " +
TableName.meta_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
" containing row " + Bytes.toStringBinary(row));
}
if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(
"hbase:meta says the region " + regionInfo.getRegionNameAsString() +
" is managed by the server " + serverName + ", but it is dead.");
}
// Instantiate the location
cacheLocation(tableName, locations);
return locations;
}
}
} catch (TableNotFoundException e) {
// if we got this error, probably means the table just plain doesn't
// exist. rethrow the error immediately. this should always be coming
// from the HTable constructor.
throw e;
} catch (LocalConnectionClosedException cce) {
// LocalConnectionClosedException is specialized instance of DoNotRetryIOE.
// Thrown when we check if this connection is closed. If it is, don't retry.
throw cce;
} catch (IOException e) {
ExceptionUtil.rethrowIfInterrupt(e);
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
if (e instanceof CallQueueTooBigException) {
// Give a special check on CallQueueTooBigException, see #Hbase-17114
pausebase = this.pauseForCQTBE;
}
if (tries < maxAttempts - 1) {
LOG.debug("locateRegionInmeta parentTable='{}', attempt={} of {} failed; retrying " +
"after sleep of {}", TableName.meta_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
} else {
throw e;
}
// only relocate the parent region if necessary
relocatemeta =
!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
} finally {
userRegionLock.unlock();
}
try{
Thread.sleep(ConnectionUtils.getPauseTime(pausebase, tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Giving up trying to location region in " +
"meta: thread is interrupted.");
}
}
}
locateRegionInmeta()方法是对非meta表中特定行row所在Region位置信息的检索,它本质上是通过检索Hbase中meta表数据来获取对应非meta表中行row对应的Region位置信息的,其处理逻辑如下:
1、根据标志位useCache确定:如果我们支持在缓存中查找,先在缓存中看看是否我们已经有该Region,调用的是getCachedLocation()方法,传入tableName和row即可,存在即返回,否则继续;
2、缓存中没有,构造一个scan,先根据表名tableName、行row、字符串"99999999999999",调用HRegionInfo的createRegionName()方法,创建一个Region Name:metaKey;
3、构造一个Scan,scan的起始行为上述metaKey,并且是一个反向小scan,即reversed small Scan;
4、确定重试上限次数localNumRetries:如果标志位retry为true的话,重试上限次数localNumRetries取numTries,即取参数hbase.client.retries.number,参数未配置的话默认为31;
5、在一个循环内,当重试次数tries未达到上限localNumRetries且未定位到对应Region位置信息时:
5.1、先判断重试次数tries是否达到上限localNumRetries,达到的话,直接抛出NoServerForRegionException异常;
.2、根据是否支持从缓存中取来判断:
5.2.1、如果支持使用缓存的话,每次再从缓存中取一遍,存在即返回,否则继续;
5.2.2、如果我们不支持使用缓存,删除任何存在的相关缓存,以确保它不会干扰我们的查询,调用metaCache的clearCache()方法,根据tableName和row来删除;
5.3、构造ClientSmallReversedScanner实例rcs,从meta表中查找,而meta表的表名固定为hbase:meta,它的namespace为"meta",qualifier为"meta",获取scanner,注意,这一步实际上是一个内嵌的scan,它也需要根据表和行进行Region的定位,而这个表就是Hbase中的meta表,既然从meta表中查找数据,那么就又折回到上面针对meta表和非meta标的的if…else…判断了,关于meta表的定位我们稍等再讲;
5.4、通过scanner的next()方法,获取唯一的结果regionInfoRow;
5.5、关闭ClientSmallReversedScanner;
5.6、如果regionInfoRow为空,直接抛出TableNotFoundException异常;
5.7、将Result转换为我们需要的RegionLocations,即regionInfoRow->locations;
5.8、从locations中获取Region信息HRegionInfo;
5.9、做一些必要的数据和状态校验,比如:
5.9.1、验证表名是否一致;
5.9.2、验证Rgion是否已分裂;
5.9.3、验证Rgion是否已下线;
5.9.4、从locations中根据replicaId获取ServerName,验证ServerName是否已死亡;
5.10、调用cacheLocation()方法缓存获得的位置信息locations,并返回;
5.11、如果中间出现异常,则当前线程休眠一段时间,再次重试,休眠的时间与pause和tries有关,越往后,停顿时间一般越长(波动时间除外)。
整个过程如上所述,要点下篇继续叙述。
以上就是今天要讲的内容,本文介绍了将需要读写的行Row准确的定位到其所在Region和RegionServer上,Hbase是如何实现数据的检索的。



