栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hbase源码分析(五)Region定位(上)2021SC@SDUSC

Hbase源码分析(五)Region定位(上)2021SC@SDUSC

文章目录
  • 前言
  • 总结


前言

Hbase是一个基于RowKey进行检索的分布式数据库。它按照行的方向将表中的数据切分成一个个Region,而每个Region都会存在一个起始行StartKey和一个终止行EndKey。Region会最终选择一个RegionSever上线,并依靠RegionSever对外提供数据存取服务。那么,Hbase是如何实现数据的检索,也就是它如何将需要读写的行Row准确的定位到其所在Region和RegionServer上的呢?此处,我们就将研究下HRegion的定位。


之前我们已经研究过Hbase读取数据的应用–Scan,在Scan的过程中,它每次通过RPC与服务端通信,都是针对特定的Region及其所在RegionServer进行数据读取请求,将数据缓存至客户端。在它迭代获取数据的Scanner的next()中,会检查缓存中是否存在数据,若无,则加载缓存,然后直接从缓存中拉取数据,代码如下:

@Override
  public Result next() throws IOException {
    try {
      lock.lock();
      while (cache.isEmpty()) {
        handleException();
        if (this.closed) {
          return null;
        }
        try {
          notEmpty.await();
        } catch (InterruptedException e) {
          throw new InterruptedIOException("Interrupted when wait to load cache");
        }
      }

      Result result = pollCache();
      if (prefetchCondition()) {
        notFull.signalAll();
      }
      return result;
    } finally {
      lock.unlock();
      handleException();
    }
  }

而这个加载缓存的loadCache()方法,则会调用call()方法,发送RPC请求给对应的RegionServer上的Region,那么它是如何定位Region的呢?我们先看下这个call()方法,代码如下:

Result[] call(Scan scan, ScannerCallableWithReplicas callable,
      RpcRetryingCaller caller, int scannerTimeout)
      throws IOException, RuntimeException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException();
    }
    // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
    // we do a callWithRetries
    // caller为RpcRetryingCaller类型
    // callable为ScannerCallableWithReplicas类型
    return caller.callWithoutRetries(callable, scannerTimeout);
  }

实际上caller为RpcRetryingCaller类型,而callable为ScannerCallableWithReplicas类型,我们看下RpcRetryingCaller的callWithoutRetries()方法,关键代码如下:

// 先调用prepare()方法,再调用call()方法,超时时间为callTimeout
  callable.prepare(false);
  return callable.call(callTimeout);

实际上最终调用的是callable的call()方法,也就是ScannerCallableWithReplicas的call()方法,我们跟进下关键代码:

代码如下(示例):

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import  ssl
ssl._create_default_https_context = ssl._create_unverified_context

Region的定位是通过调用RpcRetryingCallerWithReadReplicas的getRegionLocations()方法进行的,它需要是否使用缓存标识位useCache、主从复制replicaId、ClusterConnection集群连接器cConnection,表名tableName、所在行Row等关键参数,并返回RegionLocations,用于表示Region的位置信息。而RegionLocations中存在一个数组locations,它的定义如下:

// locations array contains the HRL objects for known region replicas indexes by the replicaId.
  // elements can be null if the region replica is not known at all. A null value indicates
  // that there is a region replica with the index as replicaId, but the location is not known
  // in the cache.
  private final HRegionLocation[] locations; // replicaId -> HRegionLocation.

它是一个HRegionLocation类型的数组,实际上存储的是replicaId到HRegionLocation的映射,replicaId就是数组的下标。而上面调用getRegionLocations()方法时,传入的replicaId为RegionReplicaUtil.DEFAULT_REPLICA_ID,也就是0。那么HRegionLocation是什么呢?看下它的两个关键成员变量就知道了:

 private final HRegionInfo regionInfo;
  private final ServerName serverName; 

从RpcRetryingCallerWithReadReplicas的getRegionLocations()方法开始,代码如下:

 static RegionLocations getRegionLocations(boolean useCache, int replicaId,
                 ClusterConnection cConnection, TableName tableName, byte[] row)
      throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {

    RegionLocations rl;
    try {
      if (useCache) {
        rl = cConnection.locateRegion(tableName, row, true, true, replicaId);
      } else {
        rl = cConnection.relocateRegion(tableName, row, replicaId);
      }
    } catch (DonotRetryIOException | InterruptedIOException | RetriesExhaustedException e) {
      throw e;
    } catch (IOException e) {
      throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId
          + " of region for " + Bytes.toStringBinary(row) + " in " + tableName, e);
    }
    if (rl == null) {
      throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId
          + " of region for " + Bytes.toStringBinary(row) + " in " + tableName);
    }

    return rl;
  }
}

其实逻辑很简单,就分两种情况,使用缓存和不使用缓存。而且,我们也应该能猜出来,即便是使用缓存,如果缓存中没有的话,它还是会走一遍不使用缓存的流程,将获取到的Region位置信息加载到缓存中,然后再返回给外部调用者,最终我们需要共同研究的仅仅是不使用缓存的情况下如何定位Region而已。
首先,我们来看下不使用缓存的情况下,是如何进行Region定位的。它调用的是ClusterConnection的relocateRegion()方法,而这个ClusterConnection是一个接口,它的实例化,是在HTable中进行,然后一层层传递过来的。我们先看下它的实例化,在HTable的构造方法中,代码如下:

 this.connection = ConnectionManager.getConnectionInternal(conf);

通过ConnectionManager的静态方法getConnectionInternal(),从配置信息conf中加载而来。继续看下它的代码:

 static ClusterConnection getConnectionInternal(final Configuration conf)
    throws IOException {
	  
	// 根据配置信息conf构造HConnectionKey
    HConnectionKey connectionKey = new HConnectionKey(conf);
    
    synchronized (CONNECTION_INSTANCES) {
    	
      // 先从CONNECTION_INSTANCES中根据HConnectionKey获取连接HConnectionImplementation类型的connection,
      // CONNECTION_INSTANCES为HConnectionKey到HConnectionImplementation的映射集合
      HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
      
      if (connection == null) {// 如果CONNECTION_INSTANCES中不存在
    	  
    	// 调用createConnection()方法创建一个HConnectionImplementation
        connection = (HConnectionImplementation)createConnection(conf, true);
        
        // 将新创建的HConnectionImplementation与HConnectionKey的对应关系存入CONNECTION_INSTANCES
        CONNECTION_INSTANCES.put(connectionKey, connection);
      } else if (connection.isClosed()) {// 如果CONNECTION_INSTANCES中存在,且已关闭的话
    	  
    	// 调用ConnectionManager的deleteConnection()方法,删除connectionKey对应的记录:
    	// 1、调用decCount()方法减少计数;
    	// 2、从CONNECTION_INSTANCES类表中移除connectionKey对应记录;
    	// 3、调用HConnectionImplementation的internalClose()方法处理关闭连接事宜
        ConnectionManager.deleteConnection(connectionKey, true);
        
        // 调用createConnection()方法创建一个HConnectionImplementation
        connection = (HConnectionImplementation)createConnection(conf, true);
        
        // 将新创建的HConnectionImplementation与HConnectionKey的对应关系存入CONNECTION_INSTANCES
        CONNECTION_INSTANCES.put(connectionKey, connection);
      }
      
      // 连接计数器增1
      connection.incCount();
      
      // 返回连接
      return connection;
    }
  }

这个HConnectionKey实际上是连接的一个Key类,包含了连接对应的hbase.zookeeper.quorum、hbase.zookeeper.property.clientPort等重要信息,而获取连接的方法也很简单,如果之前创建过key相同的连接,直接从CONNECTION_INSTANCES集合中根据HConnectionKey获取,并将连接计数器增1,直接返回连接,获取不到的话,根据HConnectionKey创建一个新的,并加入CONNECTION_INSTANCES集合,而且,如果获取到的连接是Closed的话,调用ConnectionManager的deleteConnection()方法,删除connectionKey对应的记录,创建一个新的连接创建一个HConnectionImplementation,并加入到CONNECTION_INSTANCES集合。

述ClusterConnection的实现类就是HConnectionImplementation,那么我们回到正轨上,继续研究Region的定位,先看下不使用缓存的情况的情况下是如何处理的。好,我们进入HConnectionImplementation的relocateRegion()方法,代码如下:

 @Override
  public RegionLocations relocateRegion(final TableName tableName,
      final byte [] row, int replicaId) throws IOException{
    // Since this is an explicit request not to use any caching, finding
    // disabled tables should not be desirable.  This will ensure that an exception is thrown when
    // the first time a disabled table is interacted with.
    if (!tableName.equals(TableName.meta_TABLE_NAME) && isTableDisabled(tableName)) {
      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
    }

    return locateRegion(tableName, row, false, true, replicaId);
  }

先做一个必要的检查,如果表不是meta表,并且表被禁用的话,直接抛出TableNotEnabledException,然后调用locateRegion()方法进行Region的定位,传入的useCache为false,retry为true,即不使用缓存,并且进行重试。
在分析locateRegion()方法前,我们先折回去看看使用缓存情况的处理,它也是调用的locateRegion()方法,只不过传入的使用缓存标志位useCache为true这一个区别而已,好了,殊途同归,这里我们就只研究locateRegion()方法就行了,代码如下:

@Override
  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
      boolean retry) throws IOException {
    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
  }

  @Override
  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
      boolean retry, int replicaId) throws IOException {
    checkClosed();
    if (tableName == null || tableName.getName().length == 0) {
      throw new IllegalArgumentException("table name cannot be null or zero length");
    }
    if (tableName.equals(TableName.meta_TABLE_NAME)) {
      return locatemeta(tableName, useCache, replicaId);
    } else {
      // Region not in the cache - have to go to the meta RS
      return locateRegionInmeta(tableName, row, useCache, retry, replicaId);
    }
  }

locateRegion()方法上来先做一些必要的检查:
1、判断连接是否已关闭的标志位closed,为true则直接抛出IOException异常;

    2、判断表名tableName,表名为空的话直接抛出IllegalArgumentException异常。

    然后,根据表是否为meta表,做以下处理:

    1、如果是meta表,直接调用locatemeta()方法进行定位;

    2、如果不是meta表,cache中没有,需要访问meta RS,调用locateRegionInmeta()方法进行定位;

    我们今天先看非meta表,进入locateRegionInmeta()方法,代码如下:

private RegionLocations locateRegionInmeta(TableName tableName, byte[] row, boolean useCache,
boolean retry, int replicaId) throws IOException {
// If we are supposed to be using the cache, look in the cache to see if we already have the
// region.
if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
}
// build the key of the meta region we should be looking for.
// the extra 9’s on the end are necessary to allow “exact” matches
// without knowing the precise region names.
byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
byte[] metaStopKey =
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, “”, false);
Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
.setReadType(ReadType.PREAD);

   switch (this.metaReplicaMode) {
      case LOAD_BALANCE:
        int metaReplicaId = this.metaReplicaSelector.select(tableName, row,
          RegionLocateType.CURRENT);
        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
          // If the selector gives a non-primary meta replica region, then go with it.
          // Otherwise, just go to primary in non-hedgedRead mode.
          s.setConsistency(Consistency.TIMELINE);
          s.setReplicaId(metaReplicaId);
        }
        break;
      case HEDGED_READ:
        s.setConsistency(Consistency.TIMELINE);
        break;
      default:
        // do nothing
    }
    int maxAttempts = (retry ? numTries : 1);
    boolean relocatemeta = false;
    for (int tries = 0; ; tries++) {
      if (tries >= maxAttempts) {
        throw new NoServerForRegionException("Unable to find region for "
            + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
      }
      if (useCache) {
        RegionLocations locations = getCachedLocation(tableName, row);
        if (locations != null && locations.getRegionLocation(replicaId) != null) {
          return locations;
        }
      } else {
        // If we are not supposed to be using the cache, delete any existing cached location
        // so it won't interfere.
        // We are only supposed to clean the cache for the specific replicaId
        metaCache.clearCache(tableName, row, replicaId);
      }
      // Query the meta region
      long pausebase = this.pause;
      takeUserRegionLock();
      try {
        // We don't need to check if useCache is enabled or not. Even if useCache is false
        // we already cleared the cache for this row before acquiring userRegion lock so if this
        // row is present in cache that means some other thread has populated it while we were
        // waiting to acquire user region lock.
        RegionLocations locations = getCachedLocation(tableName, row);
        if (locations != null && locations.getRegionLocation(replicaId) != null) {
          return locations;
        }
        if (relocatemeta) {
          relocateRegion(TableName.meta_TABLE_NAME, HConstants.EMPTY_START_ROW,
            RegionInfo.DEFAULT_REPLICA_ID);
        }
        s.resetMvccReadPoint();
        try (ReversedClientScanner rcs =
          new ReversedClientScanner(conf, s, TableName.meta_TABLE_NAME, this, rpcCallerFactory,
            rpcControllerFactory, getmetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
          boolean tableNotFound = true;
          for (;;) {
            Result regionInfoRow = rcs.next();
            if (regionInfoRow == null) {
              if (tableNotFound) {
                throw new TableNotFoundException(tableName);
              } else {
                throw new IOException(
                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
              }
            }
            tableNotFound = false;
            // convert the row result into the HRegionLocation we need!
            locations = metaTableAccessor.getRegionLocations(regionInfoRow);
            if (locations == null || locations.getRegionLocation(replicaId) == null) {
              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
            }
            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
            if (regionInfo == null) {
              throw new IOException("RegionInfo null or empty in " + TableName.meta_TABLE_NAME +
                ", row=" + regionInfoRow);
            }
            // See Hbase-20182. It is possible that we locate to a split parent even after the
            // children are online, so here we need to skip this region and go to the next one.
            if (regionInfo.isSplitParent()) {
              continue;
            }
            if (regionInfo.isOffline()) {
              throw new RegionOfflineException("Region offline; disable table call? " +
                  regionInfo.getRegionNameAsString());
            }
            // It is possible that the split children have not been online yet and we have skipped
            // the parent in the above condition, so we may have already reached a region which does
            // not contains us.
            if (!regionInfo.containsRow(row)) {
              throw new IOException(
                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
            }
            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
            if (serverName == null) {
              throw new NoServerForRegionException("No server address listed in " +
                TableName.meta_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
                " containing row " + Bytes.toStringBinary(row));
            }
            if (isDeadServer(serverName)) {
              throw new RegionServerStoppedException(
                "hbase:meta says the region " + regionInfo.getRegionNameAsString() +
                  " is managed by the server " + serverName + ", but it is dead.");
            }
            // Instantiate the location
            cacheLocation(tableName, locations);
            return locations;
          }
        }
      } catch (TableNotFoundException e) {
        // if we got this error, probably means the table just plain doesn't
        // exist. rethrow the error immediately. this should always be coming
        // from the HTable constructor.
        throw e;
      } catch (LocalConnectionClosedException cce) {
        // LocalConnectionClosedException is specialized instance of DoNotRetryIOE.
        // Thrown when we check if this connection is closed. If it is, don't retry.
        throw cce;
      } catch (IOException e) {
        ExceptionUtil.rethrowIfInterrupt(e);
        if (e instanceof RemoteException) {
          e = ((RemoteException)e).unwrapRemoteException();
        }
        if (e instanceof CallQueueTooBigException) {
          // Give a special check on CallQueueTooBigException, see #Hbase-17114
          pausebase = this.pauseForCQTBE;
        }
        if (tries < maxAttempts - 1) {
          LOG.debug("locateRegionInmeta parentTable='{}', attempt={} of {} failed; retrying " +
            "after sleep of {}", TableName.meta_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
        } else {
          throw e;
        }
        // only relocate the parent region if necessary
        relocatemeta =
          !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
      } finally {
        userRegionLock.unlock();
      }
      try{
        Thread.sleep(ConnectionUtils.getPauseTime(pausebase, tries));
      } catch (InterruptedException e) {
        throw new InterruptedIOException("Giving up trying to location region in " +
          "meta: thread is interrupted.");
      }
    }
  }

locateRegionInmeta()方法是对非meta表中特定行row所在Region位置信息的检索,它本质上是通过检索Hbase中meta表数据来获取对应非meta表中行row对应的Region位置信息的,其处理逻辑如下:
1、根据标志位useCache确定:如果我们支持在缓存中查找,先在缓存中看看是否我们已经有该Region,调用的是getCachedLocation()方法,传入tableName和row即可,存在即返回,否则继续;

2、缓存中没有,构造一个scan,先根据表名tableName、行row、字符串"99999999999999",调用HRegionInfo的createRegionName()方法,创建一个Region Name:metaKey;

3、构造一个Scan,scan的起始行为上述metaKey,并且是一个反向小scan,即reversed small Scan;

4、确定重试上限次数localNumRetries:如果标志位retry为true的话,重试上限次数localNumRetries取numTries,即取参数hbase.client.retries.number,参数未配置的话默认为31;

5、在一个循环内,当重试次数tries未达到上限localNumRetries且未定位到对应Region位置信息时:

5.1、先判断重试次数tries是否达到上限localNumRetries,达到的话,直接抛出NoServerForRegionException异常;
.2、根据是否支持从缓存中取来判断:

5.2.1、如果支持使用缓存的话,每次再从缓存中取一遍,存在即返回,否则继续;

5.2.2、如果我们不支持使用缓存,删除任何存在的相关缓存,以确保它不会干扰我们的查询,调用metaCache的clearCache()方法,根据tableName和row来删除;

5.3、构造ClientSmallReversedScanner实例rcs,从meta表中查找,而meta表的表名固定为hbase:meta,它的namespace为"meta",qualifier为"meta",获取scanner,注意,这一步实际上是一个内嵌的scan,它也需要根据表和行进行Region的定位,而这个表就是Hbase中的meta表,既然从meta表中查找数据,那么就又折回到上面针对meta表和非meta标的的if…else…判断了,关于meta表的定位我们稍等再讲;

5.4、通过scanner的next()方法,获取唯一的结果regionInfoRow;

5.5、关闭ClientSmallReversedScanner;

5.6、如果regionInfoRow为空,直接抛出TableNotFoundException异常;

5.7、将Result转换为我们需要的RegionLocations,即regionInfoRow->locations;

5.8、从locations中获取Region信息HRegionInfo;

5.9、做一些必要的数据和状态校验,比如:

5.9.1、验证表名是否一致;

5.9.2、验证Rgion是否已分裂;

5.9.3、验证Rgion是否已下线;

5.9.4、从locations中根据replicaId获取ServerName,验证ServerName是否已死亡;

5.10、调用cacheLocation()方法缓存获得的位置信息locations,并返回;

5.11、如果中间出现异常,则当前线程休眠一段时间,再次重试,休眠的时间与pause和tries有关,越往后,停顿时间一般越长(波动时间除外)。
整个过程如上所述,要点下篇继续叙述。

总结

以上就是今天要讲的内容,本文介绍了将需要读写的行Row准确的定位到其所在Region和RegionServer上,Hbase是如何实现数据的检索的。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354638.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号