栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【软件工程实践】Hive研究-Blog3

【软件工程实践】Hive研究-Blog3

【软件工程实践】Hive研究-Blog3

2021SC@SDUSC

研究内容简略介绍

本人负责的是负责的是将查询块QB转换成逻辑查询计划(OP Tree)
如下的代码出自apaceh-hive-3.1.2-src/ql/src/java/org/apache/hadoop/hive/ql/plan中,也就是我的分析目标代码。由于Blog2已经研究了mapper文件夹下的第二个文件EmptyStatsSource.java文件以及mapper文件夹下的第三个文件GroupTransformer.java文件和mapper文件夹下的第四个文件MapBackedStatsSource.java文件,那么我们这周就来研究该文件夹下的剩余源码。

metastoreStatsConnector.java文件代码解析

我们首先附上整个java文件的源码。


package org.apache.hadoop.hive.ql.plan.mapper;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.hive.metastore.api.RuntimeStat;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsMap;
import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsPersister;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;


class metastoreStatsConnector implements StatsSource {

  private static final Logger LOG = LoggerFactory.getLogger(metastoreStatsConnector.class);

  private final StatsSource ss;

  private ExecutorService executor;

  metastoreStatsConnector(int cacheSize, int batchSize, StatsSource ss) {
    this.ss = ss;
    executor = Executors.newSingleThreadExecutor(
        new BasicThreadFactory.Builder()
            .namingPattern("metastore-RuntimeStats-Loader-%d")
            .daemon(true)
            .build());

    executor.submit(new RuntimeStatsLoader(cacheSize, batchSize));
  }

  private class RuntimeStatsLoader implements Runnable {

    private int maxEntriesToLoad;
    private int batchSize;

    public RuntimeStatsLoader(int maxEntriesToLoad, int batchSize) {
      this.maxEntriesToLoad = maxEntriesToLoad;
      if (batchSize <= 0) {
        this.batchSize = -1;
      } else {
        this.batchSize = batchSize;
      }
    }

    @Override
    public void run() {
      int lastCreateTime = Integer.MAX_VALUE;
      int loadedEntries = 0;
      try {
        do {
          List rs = Hive.get().getMSC().getRuntimeStats(batchSize, lastCreateTime);
          if (rs.size() == 0) {
            break;
          }
          for (RuntimeStat thriftStat : rs) {
            loadedEntries += thriftStat.getWeight();
            lastCreateTime = Math.min(lastCreateTime, thriftStat.getCreateTime() - 1);
            try {
              ss.putAll(decode(thriftStat));
            } catch (IOException e) {
              logException("Exception while loading runtime stats", e);
            }
          }
        } while (batchSize > 0 && loadedEntries < maxEntriesToLoad);
      } catch (TException | HiveException e) {
        logException("Exception while reading metastore runtime stats", e);
      }
    }
  }

  @Override
  public boolean canProvideStatsFor(Class clazz) {
    return ss.canProvideStatsFor(clazz);
  }

  @Override
  public Optional lookup(OpTreeSignature treeSig) {
    return ss.lookup(treeSig);
  }

  @Override
  public void putAll(Map map) {
    if (map.size() == 0) {
      return;
    }
    ss.putAll(map);
    executor.submit(new RuntimeStatsSubmitter(map));
  }

  class RuntimeStatsSubmitter implements Runnable {

    private Map map;

    public RuntimeStatsSubmitter(Map map) {
      this.map = map;
    }

    @Override
    public void run() {
      try {
        RuntimeStat rec = encode(map);
        Hive.get().getMSC().addRuntimeStat(rec);
      } catch (TException | HiveException | IOException e) {
        logException("Exception while persisting runtime stat", e);
      }
    }
  }

  private RuntimeStat encode(Map map) throws IOException {
    String payload = RuntimeStatsPersister.INSTANCE.encode(new RuntimeStatsMap(map));
    RuntimeStat rs = new RuntimeStat();
    rs.setWeight(map.size());
    rs.setPayload(ByteBuffer.wrap(payload.getBytes(Charsets.UTF_8)));
    return rs;
  }

  private Map decode(RuntimeStat rs) throws IOException {
    RuntimeStatsMap rsm = RuntimeStatsPersister.INSTANCE.decode(rs.getPayload(), RuntimeStatsMap.class);
    return rsm.toMap();
  }

  public void destroy() {
    executor.shutdown();
  }

  static void logException(String msg, Exception e) {
    if (LOG.isDebugEnabled()) {
      LOG.debug(msg, e);
    } else {
      LOG.info(msg + ": " + e.getMessage());
    }
  }

}

开始解析。

类构造器方法metastoreStatsConnector
  metastoreStatsConnector(int cacheSize, int batchSize, StatsSource ss) {
    this.ss = ss;
    executor = Executors.newSingleThreadExecutor(
        new BasicThreadFactory.Builder()
            .namingPattern("metastore-RuntimeStats-Loader-%d")
            .daemon(true)
            .build());

    executor.submit(new RuntimeStatsLoader(cacheSize, batchSize));
  }

我们首先从头开始观察整个类的作用是什么,我们注意到类的声明开头有如下语句:class metastoreStatsConnector implements StatsSource ,我们注意到关键词implements,这表示整个metastoreStatsConnector.java文件是接口StatsSource的实现类。我们又注意到一行注释:Decorates a StatSource to be loaded and persisted in the metastore as well.,这是开发人员所标注的内容,它的意思为封装要加载的StatSource对象,使其更加持久化到metastore中。这说明了整个类的目标就为该注释所要完成的事情。

我们回到这个类构造方法中来。我们对暂未使用到的私有变量ss不感兴趣,我们关注后续的语句。我们首先看一下方法Executors.newSingleThreadExecutor()是一个什么方法。经过查阅资料得知,该方法的目的是创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。我们可以阅读该方法的源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new gc(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new linkedBlockingQueue < Runnable > ()));
}

我们下面使用一个案例来直观的了解这个方法:

package com.zhangxueliang.demo.springbootdemo.JUC.c_026_01_ThreadPool;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 

public class T07_SingleThreadPool {
 
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 6; i++) {
            final int j=i;
            executorService.execute(()->{
                System.out.println(Thread.currentThread().getName()+" "+j);
            });
        }
        executorService.shutdown();
    }
}

如下为输出结果:

pool-1-thread-1 0
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5

我们再来关注方法里面的BasicThreadFactory.builder()这些方法。方法namingPattern里面跟着的参数就是想要设置的线程名称。而对于后面的.daemo()方法,我们得先了解什么是守护线程:
定义:守护线程–也称“服务线程”,在没有用户线程可服务时会自动离开。
优先级:守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务。

我们举一个经典的例子:垃圾回收线程就是一个经典的守护线程,当我们的程序中不再有任何运行的线程时,程序就不会再产生垃圾,垃圾回收器也就无事可做,所以当垃圾回收线程是JVM上仅剩的线程时,垃圾回收线程就会自动离开,而它始终在低级别的状态中运行,用于实时监控和管理系统中可回收资源。

回到源码,这个方法的意思就是传入true或者false,是否将这个要产生的线程设置为守护线程。在源码中传入的参数为true,那么该方法产生的线程就为守护线程。

我们再来关注构造方法最后的executor.submit()语句。显然,我们要关注的是submit()方法,我们查阅资料得知该方法是一个执行任务的方法,并且会有一个执行结果的返回。我们可以使用一个例子来描述该方法:

import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;  
import java.util.concurrent.Callable;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  
  
public class ExecutorServiceTest {  
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        List> resultList = new ArrayList>();  
  
        // 创建10个任务并执行  
        for (int i = 0; i < 10; i++) {  
            // 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中  
            Future future = executorService.submit(new TaskWithResult(i));  
            // 将任务执行结果存储到List中  
            resultList.add(future);  
        }  
        executorService.shutdown();  
  
        // 遍历任务的结果  
        for (Future fs : resultList) {  
            try {  
                System.out.println(fs.get()); // 打印各个线程(任务)执行的结果  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (ExecutionException e) {  
                executorService.shutdownNow();  
                e.printStackTrace();  
                return;  
            }  
        }  
    }  
}  
  
class TaskWithResult implements Callable {  
    private int id;  
  
    public TaskWithResult(int id) {  
        this.id = id;  
    }  
  
      
    public String call() throws Exception {  
        System.out.println("call()方法被自动调用,干活!!!             " + Thread.currentThread().getName());  
        if (new Random().nextBoolean())  
            throw new TaskException("Meet error in task." + Thread.currentThread().getName());  
        // 一个模拟耗时的操作  
        for (int i = 999999999; i > 0; i--)  
            ;  
        return "call()方法被自动调用,任务的结果是:" + id + "    " + Thread.currentThread().getName();  
    }  
}  
  
class TaskException extends Exception {  
    public TaskException(String message) {  
        super(message);  
    }  
}  

该例子的输出结果为:

call()方法被自动调用,干活!!!             pool-1-thread-1  
call()方法被自动调用,干活!!!             pool-1-thread-2  
call()方法被自动调用,干活!!!             pool-1-thread-3  
call()方法被自动调用,干活!!!             pool-1-thread-5  
call()方法被自动调用,干活!!!             pool-1-thread-7  
call()方法被自动调用,干活!!!             pool-1-thread-4  
call()方法被自动调用,干活!!!             pool-1-thread-6  
call()方法被自动调用,干活!!!             pool-1-thread-7  
call()方法被自动调用,干活!!!             pool-1-thread-5  
call()方法被自动调用,干活!!!             pool-1-thread-8  
call()方法被自动调用,任务的结果是:0    pool-1-thread-1  
call()方法被自动调用,任务的结果是:1    pool-1-thread-2  
java.util.concurrent.ExecutionException: com.cicc.pts.TaskException: Meet error in task.pool-1-thread-3  
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)  
    at java.util.concurrent.FutureTask.get(FutureTask.java:83)  
    at com.cicc.pts.ExecutorServiceTest.main(ExecutorServiceTest.java:29)  
Caused by: com.cicc.pts.TaskException: Meet error in task.pool-1-thread-3  
    at com.cicc.pts.TaskWithResult.call(ExecutorServiceTest.java:57)  
    at com.cicc.pts.TaskWithResult.call(ExecutorServiceTest.java:1)  
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)  
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)  
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)  
    at java.lang.Thread.run(Thread.java:619)  

而对于sumbit方法里面的new RuntimeStatsLoader(cacheSize, batchSize)语句,我从需要的参数cacheSize(缓存规模)和batchSize(批处理规模)中可以猜测这个RuntimeStatsLoader类是一个任务处理的类,需要指定缓存和批处理规模来执行任务。在下文我们会见到关于这个类的详细介绍。

至此,整个方法的目的也清楚明了了:创建一个安全的线程并且自定义其名称,方便调用且设置其为守护线程,完成资源自动回收的功能,并且给定缓存规模和批处理规模并执行任务,得到任务执行的结果(包括是否执行成功等信息)。

实现类的类构造器方法RuntimeStatsLoader

我们来看一下源码。

    public RuntimeStatsLoader(int maxEntriesToLoad, int batchSize) {
      this.maxEntriesToLoad = maxEntriesToLoad;
      if (batchSize <= 0) {
        this.batchSize = -1;
      } else {
        this.batchSize = batchSize;
      }
    }

我们先来看看这个Runnable是什么样的接口,因为RuntimeStatsLoader实现的就为Runnable的方法。经过查阅资料得知Tread是类且实现了Runnable接口。那么我们就可以知道本类实现的也和Thread类实现的方法类似,都与线程有关。

那么回到源码,我们知道batchSize是批处理的规模,而maxEntriesToLoad则为最大的装载条目数量。整个构造方法的目的我们也大致清楚了:自定义最大装载条目数量maxEntriesToload和批处理规模batchSize,然后判断批处理规模的大小并作出对应的操作。

方法run
    @Override
    public void run() {
      int lastCreateTime = Integer.MAX_VALUE;
      int loadedEntries = 0;
      try {
        do {
          List rs = Hive.get().getMSC().getRuntimeStats(batchSize, lastCreateTime);
          if (rs.size() == 0) {
            break;
          }
          for (RuntimeStat thriftStat : rs) {
            loadedEntries += thriftStat.getWeight();
            lastCreateTime = Math.min(lastCreateTime, thriftStat.getCreateTime() - 1);
            try {
              ss.putAll(decode(thriftStat));
            } catch (IOException e) {
              logException("Exception while loading runtime stats", e);
            }
          }
        } while (batchSize > 0 && loadedEntries < maxEntriesToLoad);
      } catch (TException | HiveException e) {
        logException("Exception while reading metastore runtime stats", e);
      }
    }
  }

首先我们关注这个lastCreateTime变量,它的取值为Integer.MAX_VALUE,而这个Integer.MAX_VALUE表示int数据类型的最大取值数:2147483647,该变量的作用在后面。我们来看一下这个语句
List rs = Hive.get().getMSC().getRuntimeStats(batchSize, lastCreateTime);
我们先关注的是里面的getRuntimeStats(batchSize,lastCreateTime)起到什么作用。经过查阅资料得知,该方法返回任务使用CPU的时间,而传入的batchSize参数应该是作为处理次数,也就是决定List规模的参数,lastCreateTIme则作为初始化的参考数值传入进行任务执行。

进入到第一个分支语块,如果这个队列为空,那么自然而然就应该结束操作。如果不为空,我们就会使用一个for循环对这个队列里面的每一个元素进行遍历和处理。我们来看for循环里面的语句。首先对于每一个在rs队列里面的元素,都会使得变量loadedEntries自增,而自增的值为每个元素的权重值,即方法getWeight()的返回值,也就是占用CPU运行任务的时间。然后lastCreateTime则更新为创建任务时所需的时间的最小值(队列当中)。接着,再执行putAll方法。对于这个putAll方法以及里面使用的decode方法我们放在下文进行讲解。而catch语句就是打印我们想要输出的标识语句以及具体的报错信息,这些没有什么特别之处。而后续的while语句是为了防止空转时内存过载导致程序崩溃的设计,确实非常精妙。

方法canProvideStatsFor
  @Override
  public boolean canProvideStatsFor(Class clazz) {
    return ss.canProvideStatsFor(clazz);
  }

这里要注意的就是方法体的canProvideStatsFor方法,我们在Blog1就对其进行了解析。我们先来看一下这个方法的源码:

  @Override
  public boolean canProvideStatsFor(Class clazz) {
    if (cache.size() > 0 && Operator.class.isAssignableFrom(clazz)) {
      return true;
    }
    return false;
  }

其中,对于cache.size()方法,我们很容易得知该方法时返回cache也就是我们该开始预设的缓冲块的大小。而后面的这个Operator.class.isAssignableFrom()方法就需要查阅资料。查阅资料得知:当前Class对象如果是参数Class对象的父类,父接口,或者是相同,都会返回true。也就是说,如果当传入的Class对象符合要求,才会返回true,也就是相当于提供了一层保险机制,相当于变相的说明了这个方法是部分private的,十分聪明的写法。那么整个方法的意思就清楚了:只有当传入的Class对象为当前Class对象的弗雷,父接口或者相同时,和设置的代码块大小大于0的情况下,才能返回true,否则全部返回false。

方法lookup
  @Override
  public Optional lookup(OpTreeSignature treeSig) {
    return ss.lookup(treeSig);
  }

这个方法我们同样的在Blog1中已经进行解析了。我们可以看一下该方法的源码。

 @Override
  public Optional lookup(OpTreeSignature treeSig) {
    return Optional.ofNullable(cache.getIfPresent(treeSig));
  }

那么,这个Optional.ofNullable()是什么方法呢?
这个方法的意思如下:

1.首先执行ofNullable()方法,如果T对象为空,执行empty()方法;不为空,执行of(value)方法;

2.empty()方法,初始化一个空对象Optional(空对象和null不是一回事哈);

3.of(value)方法,将泛型对象T用于Optional构造方法的参数上,返回一个有值的对象

4.经过上面两步,从而保证了Optional不为null,避免了空指针;

那么,在方法里面的另外一个方法getIfPresent(treeSig)是什么呢?
又经过查阅资料,我知道了他的用法。

getIfPresent(key):从现有的缓存中获取,如果缓存中有key,则返回value,如果没有则返回null。

那么整个方法的意思就明了了:先从缓存中看有没有目标"treeSig"对应的值,如果有就返回没有就返回Null,然后再调用Optional.ofNullable()方法返回一个不为空的指针,防止抛出空指针异常的错误导致整个进程奔溃,或许加上try-catch语句可以解决。

方法putAll
  @Override
  public void putAll(Map map) {
    if (map.size() == 0) {
      return;
    }
    ss.putAll(map);
    executor.submit(new RuntimeStatsSubmitter(map));
  }

这个方法我们还是在Blog1中解析过了,但是只是解析了其中的一部分,我们现在结合Blog1的内容继续对该类下的putAll方法进行解析。

首先我们观察一下第一个if分支语句,它判断传入的参数map的规模是否为0,若为0则结束方法返回,这是显而易见的处理方法,空的映射是无法处理的。如果传入的map参数不为空,则调用真正的putAll方法。我们现在来看一下putAll方法的源码

  @Override
  public void putAll(Map map) {
    for (Entry entry : map.entrySet()) {
      put(entry.getKey(), entry.getValue());
    }
  }

我们先来看一下put方法是什么样的一个方法。

  public void put(OpTreeSignature sig, OperatorStats opStat) {
    cache.put(sig, opStat);
  }

cache.put();
这是一个什么方法?我们继续查阅相关资料。经过查阅,我们得到如下信息:

那么我们知道了,这个方法就是一个设置键值对的作用,那么整个put方法的作用也是如此。

我们再来看看整个map.entrySet()方法是一个什么方法。经过查阅资料我们可以得知:该方法返回的是此映射中包含的映射的 Set 视图。
简单来说,就是把所有的映射关系都呈现出来。而看起来十分奇怪的语句
Entry entry : map.entrySet()是什么意思呢?我们再次经过查阅之后,发现这是一个遍历Map的方式。

那么整个方法的作用我们也清楚了:遍历整个map里面的键值对,把键值对放到entry里面。

我们回到原来的putAll方法,来看最后一句

executor.submit(new RuntimeStatsSubmitter(map));

我们在Blog2中就已经解释了submit方法了,这是一个提交任务并执行,返回执行结果的方法,那么这句话的作用就是将传入的参数map作为任务提交给CPU,由CPU执行然后返回执行结果,虽然这个执行结果是否需要并不重要,重要的是执行任务。

小结

本周的Hive源码解析任务暂时完成了,我学习到了关于Hive的底层逻辑,对Hive有了更高的理解。期待下周对Hive的源码解析能够学习到更多的东西。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342596.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号