2021SC@SDUSC
研究内容简略介绍本人负责的是负责的是将查询块QB转换成逻辑查询计划(OP Tree)
如下的代码出自apaceh-hive-3.1.2-src/ql/src/java/org/apache/hadoop/hive/ql/plan中,也就是我的分析目标代码。由于Blog2已经研究了mapper文件夹下的第二个文件EmptyStatsSource.java文件以及mapper文件夹下的第三个文件GroupTransformer.java文件和mapper文件夹下的第四个文件MapBackedStatsSource.java文件,那么我们这周就来研究该文件夹下的剩余源码。
我们首先附上整个java文件的源码。
package org.apache.hadoop.hive.ql.plan.mapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.hive.metastore.api.RuntimeStat;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsMap;
import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsPersister;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
class metastoreStatsConnector implements StatsSource {
private static final Logger LOG = LoggerFactory.getLogger(metastoreStatsConnector.class);
private final StatsSource ss;
private ExecutorService executor;
metastoreStatsConnector(int cacheSize, int batchSize, StatsSource ss) {
this.ss = ss;
executor = Executors.newSingleThreadExecutor(
new BasicThreadFactory.Builder()
.namingPattern("metastore-RuntimeStats-Loader-%d")
.daemon(true)
.build());
executor.submit(new RuntimeStatsLoader(cacheSize, batchSize));
}
private class RuntimeStatsLoader implements Runnable {
private int maxEntriesToLoad;
private int batchSize;
public RuntimeStatsLoader(int maxEntriesToLoad, int batchSize) {
this.maxEntriesToLoad = maxEntriesToLoad;
if (batchSize <= 0) {
this.batchSize = -1;
} else {
this.batchSize = batchSize;
}
}
@Override
public void run() {
int lastCreateTime = Integer.MAX_VALUE;
int loadedEntries = 0;
try {
do {
List rs = Hive.get().getMSC().getRuntimeStats(batchSize, lastCreateTime);
if (rs.size() == 0) {
break;
}
for (RuntimeStat thriftStat : rs) {
loadedEntries += thriftStat.getWeight();
lastCreateTime = Math.min(lastCreateTime, thriftStat.getCreateTime() - 1);
try {
ss.putAll(decode(thriftStat));
} catch (IOException e) {
logException("Exception while loading runtime stats", e);
}
}
} while (batchSize > 0 && loadedEntries < maxEntriesToLoad);
} catch (TException | HiveException e) {
logException("Exception while reading metastore runtime stats", e);
}
}
}
@Override
public boolean canProvideStatsFor(Class> clazz) {
return ss.canProvideStatsFor(clazz);
}
@Override
public Optional lookup(OpTreeSignature treeSig) {
return ss.lookup(treeSig);
}
@Override
public void putAll(Map map) {
if (map.size() == 0) {
return;
}
ss.putAll(map);
executor.submit(new RuntimeStatsSubmitter(map));
}
class RuntimeStatsSubmitter implements Runnable {
private Map map;
public RuntimeStatsSubmitter(Map map) {
this.map = map;
}
@Override
public void run() {
try {
RuntimeStat rec = encode(map);
Hive.get().getMSC().addRuntimeStat(rec);
} catch (TException | HiveException | IOException e) {
logException("Exception while persisting runtime stat", e);
}
}
}
private RuntimeStat encode(Map map) throws IOException {
String payload = RuntimeStatsPersister.INSTANCE.encode(new RuntimeStatsMap(map));
RuntimeStat rs = new RuntimeStat();
rs.setWeight(map.size());
rs.setPayload(ByteBuffer.wrap(payload.getBytes(Charsets.UTF_8)));
return rs;
}
private Map decode(RuntimeStat rs) throws IOException {
RuntimeStatsMap rsm = RuntimeStatsPersister.INSTANCE.decode(rs.getPayload(), RuntimeStatsMap.class);
return rsm.toMap();
}
public void destroy() {
executor.shutdown();
}
static void logException(String msg, Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
} else {
LOG.info(msg + ": " + e.getMessage());
}
}
}
开始解析。
类构造器方法metastoreStatsConnector metastoreStatsConnector(int cacheSize, int batchSize, StatsSource ss) {
this.ss = ss;
executor = Executors.newSingleThreadExecutor(
new BasicThreadFactory.Builder()
.namingPattern("metastore-RuntimeStats-Loader-%d")
.daemon(true)
.build());
executor.submit(new RuntimeStatsLoader(cacheSize, batchSize));
}
我们首先从头开始观察整个类的作用是什么,我们注意到类的声明开头有如下语句:class metastoreStatsConnector implements StatsSource ,我们注意到关键词implements,这表示整个metastoreStatsConnector.java文件是接口StatsSource的实现类。我们又注意到一行注释:Decorates a StatSource to be loaded and persisted in the metastore as well.,这是开发人员所标注的内容,它的意思为封装要加载的StatSource对象,使其更加持久化到metastore中。这说明了整个类的目标就为该注释所要完成的事情。
我们回到这个类构造方法中来。我们对暂未使用到的私有变量ss不感兴趣,我们关注后续的语句。我们首先看一下方法Executors.newSingleThreadExecutor()是一个什么方法。经过查阅资料得知,该方法的目的是创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。我们可以阅读该方法的源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new gc(1, 1,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue < Runnable > ()));
}
我们下面使用一个案例来直观的了解这个方法:
package com.zhangxueliang.demo.springbootdemo.JUC.c_026_01_ThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class T07_SingleThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 6; i++) {
final int j=i;
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+" "+j);
});
}
executorService.shutdown();
}
}
如下为输出结果:
pool-1-thread-1 0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1 5
我们再来关注方法里面的BasicThreadFactory.builder()这些方法。方法namingPattern里面跟着的参数就是想要设置的线程名称。而对于后面的.daemo()方法,我们得先了解什么是守护线程:
定义:守护线程–也称“服务线程”,在没有用户线程可服务时会自动离开。
优先级:守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务。
我们举一个经典的例子:垃圾回收线程就是一个经典的守护线程,当我们的程序中不再有任何运行的线程时,程序就不会再产生垃圾,垃圾回收器也就无事可做,所以当垃圾回收线程是JVM上仅剩的线程时,垃圾回收线程就会自动离开,而它始终在低级别的状态中运行,用于实时监控和管理系统中可回收资源。
回到源码,这个方法的意思就是传入true或者false,是否将这个要产生的线程设置为守护线程。在源码中传入的参数为true,那么该方法产生的线程就为守护线程。
我们再来关注构造方法最后的executor.submit()语句。显然,我们要关注的是submit()方法,我们查阅资料得知该方法是一个执行任务的方法,并且会有一个执行结果的返回。我们可以使用一个例子来描述该方法:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
List> resultList = new ArrayList>();
// 创建10个任务并执行
for (int i = 0; i < 10; i++) {
// 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
Future future = executorService.submit(new TaskWithResult(i));
// 将任务执行结果存储到List中
resultList.add(future);
}
executorService.shutdown();
// 遍历任务的结果
for (Future fs : resultList) {
try {
System.out.println(fs.get()); // 打印各个线程(任务)执行的结果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
executorService.shutdownNow();
e.printStackTrace();
return;
}
}
}
}
class TaskWithResult implements Callable {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() throws Exception {
System.out.println("call()方法被自动调用,干活!!! " + Thread.currentThread().getName());
if (new Random().nextBoolean())
throw new TaskException("Meet error in task." + Thread.currentThread().getName());
// 一个模拟耗时的操作
for (int i = 999999999; i > 0; i--)
;
return "call()方法被自动调用,任务的结果是:" + id + " " + Thread.currentThread().getName();
}
}
class TaskException extends Exception {
public TaskException(String message) {
super(message);
}
}
该例子的输出结果为:
call()方法被自动调用,干活!!! pool-1-thread-1
call()方法被自动调用,干活!!! pool-1-thread-2
call()方法被自动调用,干活!!! pool-1-thread-3
call()方法被自动调用,干活!!! pool-1-thread-5
call()方法被自动调用,干活!!! pool-1-thread-7
call()方法被自动调用,干活!!! pool-1-thread-4
call()方法被自动调用,干活!!! pool-1-thread-6
call()方法被自动调用,干活!!! pool-1-thread-7
call()方法被自动调用,干活!!! pool-1-thread-5
call()方法被自动调用,干活!!! pool-1-thread-8
call()方法被自动调用,任务的结果是:0 pool-1-thread-1
call()方法被自动调用,任务的结果是:1 pool-1-thread-2
java.util.concurrent.ExecutionException: com.cicc.pts.TaskException: Meet error in task.pool-1-thread-3
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at com.cicc.pts.ExecutorServiceTest.main(ExecutorServiceTest.java:29)
Caused by: com.cicc.pts.TaskException: Meet error in task.pool-1-thread-3
at com.cicc.pts.TaskWithResult.call(ExecutorServiceTest.java:57)
at com.cicc.pts.TaskWithResult.call(ExecutorServiceTest.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
而对于sumbit方法里面的new RuntimeStatsLoader(cacheSize, batchSize)语句,我从需要的参数cacheSize(缓存规模)和batchSize(批处理规模)中可以猜测这个RuntimeStatsLoader类是一个任务处理的类,需要指定缓存和批处理规模来执行任务。在下文我们会见到关于这个类的详细介绍。
至此,整个方法的目的也清楚明了了:创建一个安全的线程并且自定义其名称,方便调用且设置其为守护线程,完成资源自动回收的功能,并且给定缓存规模和批处理规模并执行任务,得到任务执行的结果(包括是否执行成功等信息)。
实现类的类构造器方法RuntimeStatsLoader我们来看一下源码。
public RuntimeStatsLoader(int maxEntriesToLoad, int batchSize) {
this.maxEntriesToLoad = maxEntriesToLoad;
if (batchSize <= 0) {
this.batchSize = -1;
} else {
this.batchSize = batchSize;
}
}
我们先来看看这个Runnable是什么样的接口,因为RuntimeStatsLoader实现的就为Runnable的方法。经过查阅资料得知Tread是类且实现了Runnable接口。那么我们就可以知道本类实现的也和Thread类实现的方法类似,都与线程有关。
那么回到源码,我们知道batchSize是批处理的规模,而maxEntriesToLoad则为最大的装载条目数量。整个构造方法的目的我们也大致清楚了:自定义最大装载条目数量maxEntriesToload和批处理规模batchSize,然后判断批处理规模的大小并作出对应的操作。
方法run @Override
public void run() {
int lastCreateTime = Integer.MAX_VALUE;
int loadedEntries = 0;
try {
do {
List rs = Hive.get().getMSC().getRuntimeStats(batchSize, lastCreateTime);
if (rs.size() == 0) {
break;
}
for (RuntimeStat thriftStat : rs) {
loadedEntries += thriftStat.getWeight();
lastCreateTime = Math.min(lastCreateTime, thriftStat.getCreateTime() - 1);
try {
ss.putAll(decode(thriftStat));
} catch (IOException e) {
logException("Exception while loading runtime stats", e);
}
}
} while (batchSize > 0 && loadedEntries < maxEntriesToLoad);
} catch (TException | HiveException e) {
logException("Exception while reading metastore runtime stats", e);
}
}
}
首先我们关注这个lastCreateTime变量,它的取值为Integer.MAX_VALUE,而这个Integer.MAX_VALUE表示int数据类型的最大取值数:2147483647,该变量的作用在后面。我们来看一下这个语句
List
我们先关注的是里面的getRuntimeStats(batchSize,lastCreateTime)起到什么作用。经过查阅资料得知,该方法返回任务使用CPU的时间,而传入的batchSize参数应该是作为处理次数,也就是决定List规模的参数,lastCreateTIme则作为初始化的参考数值传入进行任务执行。
进入到第一个分支语块,如果这个队列为空,那么自然而然就应该结束操作。如果不为空,我们就会使用一个for循环对这个队列里面的每一个元素进行遍历和处理。我们来看for循环里面的语句。首先对于每一个在rs队列里面的元素,都会使得变量loadedEntries自增,而自增的值为每个元素的权重值,即方法getWeight()的返回值,也就是占用CPU运行任务的时间。然后lastCreateTime则更新为创建任务时所需的时间的最小值(队列当中)。接着,再执行putAll方法。对于这个putAll方法以及里面使用的decode方法我们放在下文进行讲解。而catch语句就是打印我们想要输出的标识语句以及具体的报错信息,这些没有什么特别之处。而后续的while语句是为了防止空转时内存过载导致程序崩溃的设计,确实非常精妙。
方法canProvideStatsFor @Override
public boolean canProvideStatsFor(Class> clazz) {
return ss.canProvideStatsFor(clazz);
}
这里要注意的就是方法体的canProvideStatsFor方法,我们在Blog1就对其进行了解析。我们先来看一下这个方法的源码:
@Override
public boolean canProvideStatsFor(Class> clazz) {
if (cache.size() > 0 && Operator.class.isAssignableFrom(clazz)) {
return true;
}
return false;
}
其中,对于cache.size()方法,我们很容易得知该方法时返回cache也就是我们该开始预设的缓冲块的大小。而后面的这个Operator.class.isAssignableFrom()方法就需要查阅资料。查阅资料得知:当前Class对象如果是参数Class对象的父类,父接口,或者是相同,都会返回true。也就是说,如果当传入的Class对象符合要求,才会返回true,也就是相当于提供了一层保险机制,相当于变相的说明了这个方法是部分private的,十分聪明的写法。那么整个方法的意思就清楚了:只有当传入的Class对象为当前Class对象的弗雷,父接口或者相同时,和设置的代码块大小大于0的情况下,才能返回true,否则全部返回false。
方法lookup@Override public Optionallookup(OpTreeSignature treeSig) { return ss.lookup(treeSig); }
这个方法我们同样的在Blog1中已经进行解析了。我们可以看一下该方法的源码。
@Override public Optionallookup(OpTreeSignature treeSig) { return Optional.ofNullable(cache.getIfPresent(treeSig)); }
那么,这个Optional.ofNullable()是什么方法呢?
这个方法的意思如下:
1.首先执行ofNullable()方法,如果T对象为空,执行empty()方法;不为空,执行of(value)方法;
2.empty()方法,初始化一个空对象Optional(空对象和null不是一回事哈);
3.of(value)方法,将泛型对象T用于Optional构造方法的参数上,返回一个有值的对象
4.经过上面两步,从而保证了Optional不为null,避免了空指针;
那么,在方法里面的另外一个方法getIfPresent(treeSig)是什么呢?
又经过查阅资料,我知道了他的用法。
getIfPresent(key):从现有的缓存中获取,如果缓存中有key,则返回value,如果没有则返回null。
那么整个方法的意思就明了了:先从缓存中看有没有目标"treeSig"对应的值,如果有就返回没有就返回Null,然后再调用Optional.ofNullable()方法返回一个不为空的指针,防止抛出空指针异常的错误导致整个进程奔溃,或许加上try-catch语句可以解决。
方法putAll@Override public void putAll(Mapmap) { if (map.size() == 0) { return; } ss.putAll(map); executor.submit(new RuntimeStatsSubmitter(map)); }
这个方法我们还是在Blog1中解析过了,但是只是解析了其中的一部分,我们现在结合Blog1的内容继续对该类下的putAll方法进行解析。
首先我们观察一下第一个if分支语句,它判断传入的参数map的规模是否为0,若为0则结束方法返回,这是显而易见的处理方法,空的映射是无法处理的。如果传入的map参数不为空,则调用真正的putAll方法。我们现在来看一下putAll方法的源码
@Override public void putAll(Mapmap) { for (Entry entry : map.entrySet()) { put(entry.getKey(), entry.getValue()); } }
我们先来看一下put方法是什么样的一个方法。
public void put(OpTreeSignature sig, OperatorStats opStat) {
cache.put(sig, opStat);
}
cache.put();
这是一个什么方法?我们继续查阅相关资料。经过查阅,我们得到如下信息:
那么我们知道了,这个方法就是一个设置键值对的作用,那么整个put方法的作用也是如此。
我们再来看看整个map.entrySet()方法是一个什么方法。经过查阅资料我们可以得知:该方法返回的是此映射中包含的映射的 Set 视图。
简单来说,就是把所有的映射关系都呈现出来。而看起来十分奇怪的语句
Entry
那么整个方法的作用我们也清楚了:遍历整个map里面的键值对,把键值对放到entry里面。
我们回到原来的putAll方法,来看最后一句
executor.submit(new RuntimeStatsSubmitter(map));
我们在Blog2中就已经解释了submit方法了,这是一个提交任务并执行,返回执行结果的方法,那么这句话的作用就是将传入的参数map作为任务提交给CPU,由CPU执行然后返回执行结果,虽然这个执行结果是否需要并不重要,重要的是执行任务。
小结本周的Hive源码解析任务暂时完成了,我学习到了关于Hive的底层逻辑,对Hive有了更高的理解。期待下周对Hive的源码解析能够学习到更多的东西。



