栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC hbase代码分析(七)Coprocessor分析(1)

2021SC@SDUSC hbase代码分析(七)Coprocessor分析(1)

2021SC@SDUSC hbase代码分析(七)Coprocessor分析(1)

2021SC@SDUSC 2021SC@SDUSC
2021SC@SDUSC 2021SC@SDUSC

目录
  • 2021SC@SDUSC hbase代码分析(七)Coprocessor分析(1)
    • Coprocessor机制优点
    • Coprocessor分类
    • Observer Coprocessor相关源码分析
      • Observer Coprocessor工作流程:
      • 工作流程源码分析
        • RegionObserver
        • RegionServerObserver
        • WALObserver
        • MasterObserver
    • 未完待续。。。

Coprocessor官方解释

​ 据官方介绍 What we have built is a framework that provides a library and runtime environment for executing user code within the Hbase region server and master processes. ,Hbase Coprocessor相当于是基于RegionServer-Master的MapReduce编程框架,其中RegionServer充当mapper的角色,而Master充当reducer的角色。Hbase自0.92开始提供coprocessors机制

MapReduce 和 Coprocessor有一样的操作原则,计算向数据靠拢。

简单来说,Coprocessor是一个框架,这个框架可以让你很容易地在Region Server运行你的业务逻辑代码。

Coprocessor机制优点

Hbase使用Coprocessor机制,使用户可以将自己编写的程序运行在RegionServer上。大多数情况下Hbase用户并不需要这个功能,通过调用Hbase提供的读写API或者使用Bulkload功能基本可以满足日常的业务需求。但在部分特殊应用场景下,使用Coprocessor可以大幅度提升业务的执行效率。

假如业务需要从Hbase集群加载出来几十亿行数据进行求和运算或是求平均值运算,如果调用Hbase的API进行数据的处理,势必会有以下几个问题:

  1. 大量数据传输可能会成为瓶颈,这就导致了整个业务的执行效率可能受限于数据的传输效率。
  2. 客户端内存可能会因为无法存储如此大量的数据而OOM
  3. 大量数据传输可能将集群宽带耗尽,严重影响集群中其他业务的正常读写。

在这种场景下,如果能够讲客户端的计算代码迁移到RegionServer服务器端执行,就能很好的避免上述问题,在保证不影响其他业务的情况下提升计算效率。

Coprocessor分类

  1. 其中Observer在一个特定的事件发生前或发生后触发。(类似于MySQL中的触发器)

    • 在事件发生前触发的Coprocessor需要重写以pre作为前缀的方法,比如prePut。
    • 在事件发生后触发的Coprocessor使用方法以post作为前缀,比如postPut。
  2. Endpoint是一个远程rpc调用,类似于webservice形式调用,但他不适用xml,而是使用的序列化框架是protobuf(序列化后数据更小)

Observer Coprocessor相关源码分析 Observer Coprocessor工作流程:

(图片来自Coprocessor官方讲解)

Observer Coprocessor提供钩子使用户代码在特定事件发生前或者之后执行。

例如,想在get方法之前调用你的代码逻辑,你可以重写preGetOp方法。

同理,想在get方法之后调用你的代码逻辑,你可以重写*postGetOp方法。

工作流程源码分析

在当前Hbase系统中,主要提供了四种Observer接口:

RegionObserver

主要监听Region相关事件,比如getputscandelete以及flush等。

相关源码:(仅列出部分、其它的原理相同)

default void preGetOp(ObserverContext c, Get get, List result)
    throws IOException {}
  public boolean preGet(final Get get, final List results) throws IOException {
    if (coprocEnvironments.isEmpty()) {
      return false;
    }
    boolean bypassable = true;
    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
      @Override
      public void call(RegionObserver observer) throws IOException {
          //调用RegionObserver抽象类中的preGetOp方法
        observer.preGetOp(this, get, results);
      }
    });
  }
default void postGetOp(ObserverContext c, Get get,
    List result) throws IOException {}
public void postGet(final Get get, final List results)
    throws IOException {
  if (coprocEnvironments.isEmpty()) {
    return;
  }
  execOperation(new RegionObserverOperationWithoutResult() {
    @Override
    public void call(RegionObserver observer) throws IOException {
        //调用RegionObserver抽象类中的postGetOp方法
      observer.postGetOp(this, get, results);
    }
  });
}
RegionServerObserver

主要监听RegionServer相关事件,比如RegionServer的启动、关闭、或者执行Region合并等事件。

相关源码:(以关闭RegionServer为例)

default void preStopRegionServer(
  final ObserverContext ctx) throws IOException {}
public void preStop(String message, User user) throws IOException {
  // While stopping the region server all coprocessors method should be executed first then the
  // coprocessor should be cleaned up.
  if (coprocEnvironments.isEmpty()) {
    return;
  }
  execShutdown(new RegionServerObserverOperation(user) {
    @Override
    public void call(RegionServerObserver observer) throws IOException {
        //调用RegionServerObserver抽象类的方法
      observer.preStopRegionServer(this);
    }

    @Override
    public void postEnvCall() {
      //调用执行coprocessor关闭程序
      shutdown(this.getEnvironment());
    }
  });
}
public void shutdown(E e) {
  assert e instanceof baseEnvironment;
  if (LOG.isDebugEnabled()) {
    LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
  }
  ((baseEnvironment) e).shutdown();
}

执行以上关闭程序后,会清除环境:

public void shutdown() {
  if (state == Coprocessor.State.ACTIVE) {
    state = Coprocessor.State.STOPPING;
    Thread currentThread = Thread.currentThread();
    ClassLoader hostClassLoader = currentThread.getContextClassLoader();
    try {
      currentThread.setContextClassLoader(this.getClassLoader());
      impl.stop(this);
      state = Coprocessor.State.STOPPED;
    } catch (IOException ioe) {
      LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
    } finally {
      currentThread.setContextClassLoader(hostClassLoader);
    }
  } else {
    LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
        " because not active (state="+state.toString()+")");
  }
}
WALObserver

主要监听WAL相关事件,比如WAL写入、滚动等。

相关源码分析:(以写入为例)

@Deprecated
default void preWALWrite(ObserverContext ctx,
      RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}
public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
    throws IOException {
  // Not bypassable.
  if (this.coprocEnvironments.isEmpty()) {
    return;
  }
  execOperation(new WALObserverOperation() {
    @Override
    public void call(WALObserver oserver) throws IOException {
        //调用WALObserver抽象类的方法
      oserver.preWALWrite(this, info, logKey, logEdit);
    }
  });
}
MasterObserver

主要监听Master相关事件,比如建表、删表以及修改表结构等。

default void preCreateTable(final ObserverContext ctx,
    TableDescriptor desc, RegionInfo[] regions) throws IOException {}
public void preCreateTable(final TableDescriptor htd, final RegionInfo[] regions)
    throws IOException {
  execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
    @Override
    public void call(MasterObserver observer) throws IOException {
      observer.preCreateTable(this, htd, regions);
    }
  });
}
未完待续。。。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457504.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号