栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hive企业级性能问题排查及调优

Hive企业级性能问题排查及调优

Hive 企业级性能优化

Hive 作为大数据平台举足轻重的框架,以其稳定性和简单易用性也成为当前构建企业级数据仓库时使用最多的框架之一。

但是如果我们只局限于会使用 Hive,而不考虑性能问题,就难搭建出一个完美的数仓,所以 Hive 性能调优是我们大数据从业者必须掌握的技能。

本节将给大家讲解 Hive 性能调优的一些方法及技巧。

Hive性能问题排查的方式

当我们发现一条 SQL 语句执行时间过长或者不合理时,我们就要考虑对 SQL 进行优化,优化首先得进行问题排查,那么我们可以通过哪些方式进行排查呢。

经常使用关系型数据库的同学可能知道关系型数据库的优化的诀窍-看执行计划。如 Oracle 数据库,它有多种类型的执行计划,通过多种执行计划的配合使用,可以看到根据统计信息推演的执行计划,即 Oracle 推断出来的未真正运行的执行计划;还可以看到实际执行任务的执行计划;能够观察到从数据读取到最终呈现的主要过程和中间的量化数据。可以说,在 Oracle 开发领域,掌握合适的环节,选用不同的执行计划,SQL 调优就不是一件难事。

Hive 中也有执行计划,但是 Hive 的执行计划都是预测的,这点不像 Oracle 和SQL Server 有真实的计划,可以看到每个阶段的处理数据、消耗的资源和处理的时间等量化数据。

Hive 提供的执行计划没有这些数据,这意味着虽然 Hive 的使用者知道整个 SQL 的执行逻辑,但是各阶段耗用的资源状况和整个 SQL 的执行瓶颈在哪里是不清楚的。

**想要知道 HiveSQL 所有阶段的运行信息,可以查看 YARN 提供的日志。**查看日志的链接,可以在每个作业执行后,在控制台打印的信息中找到。

如下图所示:

Hive 提供的执行计划目前可以查看的信息有以下几种:

  1. 查看执行计划的基本信息,即 explain;
  2. 查看执行计划的扩展信息,即 explain extended;
  3. 查看 SQL 数据输入依赖的信息,即 explain dependency;
  4. 查看 SQL 操作相关权限的信息,即 explain authorization;
  5. 查看 SQL 的向量化描述信息,即 explain vectorization。

在查询语句的 SQL 前面加上关键字 explain 是查看执行计划的基本方法。 用explain 打开的执行计划包含以下两部分:

  • 作业的依赖关系图,即 STAGE DEPENDENCIES;
  • 每个作业的详细信息,即 STAGE PLANS。

不懂hive中的explain,说明hive还没入门,学会explain,能够给我们工作中使用hive带来极大的便利!

⭐Hive 中的 explain 执行计划详解:

  1. explain 的用法及参数

(1)HIVE提供了EXPLAIN命令来展示一个查询的执行计划,这个执行计划对于我们了解底层原理,hive 调优,排查数据倾斜等很有帮助;

EXPLAIN[EXTENDED|CBO|AST|DEPENDENCY|AUTHORIZATION|LOCKS|VECTORIZATION|ANALYZE] query

explain 后面可以跟以下可选参数,注意:这几个可选参数不是 hive 每个版本都支持的!

  • EXTENDED:加上 extended 可以输出有关计划的额外信息。这通常是物理信息,例如文件名。这些额外信息对我们用处不大;
  • CBO:输出由Calcite优化器生成的计划。CBO 从 hive 4.0.0 版本开始支持;
  • AST:输出查询的抽象语法树。AST 在hive 2.1.0 版本删除了,存在bug,转储AST可能会导致OOM错误,将在4.0.0版本修复;
  • DEPENDENCY:dependency在EXPLAIN语句中使用会产生有关计划中输入的额外信息。它显示了输入的各种属性;
  • **AUTHORIZATION:**显示所有的实体需要被授权执行(如果存在)的查询和授权失败;
  • LOCKS:这对于了解系统将获得哪些锁以运行指定的查询很有用。LOCKS 从 hive 3.2.0 开始支持;
  • **VECTORIZATION:**将详细信息添加到EXPLAIN输出中,以显示为什么未对Map和Reduce进行矢量化。从 Hive 2.3.0 开始支持;
  • ANALYZE:用实际的行数注释计划。从 Hive 2.2.0 开始支持

在 hive cli 中输入以下命令(hive 2.3.7):

explain select sum(id) from test1;

得到结果(请逐行看完,即使看不懂也要每行都看):

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: test1
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: id (type: int)
              outputColumnNames: id
              Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: sum(id)
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  sort order:
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: sum(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

看完以上内容有什么感受,是不是感觉都看不懂,不要着急,下面将会详细讲解每个参数,相信你学完下面的内容之后再看 explain 的查询结果将游刃有余。

一个HIVE查询被转换为一个由一个或多个stage组成的序列(有向无环图DAG)。这些stage可以是MapReduce stage,也可以是负责元数据存储的stage,也可以是负责文件系统的操作(比如移动和重命名)的stage。

我们将上述结果拆分看,先从最外层开始,包含两个大的部分:

  • stage dependencies: 各个stage之间的依赖性
  • stage plan: 各个stage的执行计划

先看第一部分 stage dependencies ,包含两个 stage,Stage-1 是根stage,说明这是开始的stage,Stage-0 依赖 Stage-1,Stage-1执行完成后执行Stage-0。

再看第二部分 stage plan,里面有一个 Map Reduce,一个MR的执行计划分为两个部分:

  • Map Operator Tree: MAP端的执行计划树
  • Reduce Operator Tree: Reduce端的执行计划树

这两个执行计划树里面包含这条sql语句的 operator:

    1. map端第一个操作肯定是加载表,所以就是 TableScan 表扫描操作,常见的属性:
      • alias: 表名称
      • Statistics: 表统计信息,包含表中数据条数,数据大小等;
    1. Select Operator: 选取操作,常见的属性 :
    • expressions:需要的字段名称及字段类型;
    • outputColumnNames:输出的列名称;
    • Statistics:表统计信息,包含表中数据条数,数据大小等;
    1. Group By Operator:分组聚合操作,常见的属性:
      • aggregations:显示聚合函数信息;
      • mode:聚合模式,值有 hash:随机聚合,就是hash partition;partial:局部聚合;final:最终聚合;
      • keys:分组的字段,如果没有分组,则没有此字段;
      • outputColumnNames:聚合之后输出列名;
      • Statistics: 表统计信息,包含分组聚合之后的数据条数,数据大小等;
    1. Reduce Output Operator:输出到reduce操作,常见属性:
      • sort order:值为空 不排序;值为 + 正序排序,值为 - 倒序排序;值为 ± 排序的列为两列,第一列为正序,第二列为倒序;
    1. Filter Operator:过滤操作,常见的属性:
      • predicate:过滤条件,如sql语句中的where id>=1,则此处显示(id >= 1);
    1. Map Join Operator:join 操作,常见的属性:
      • condition map:join方式 ,如Inner Join 0 to 1 Left Outer Join0 to 2;
      • keys: join 的条件字段;
      • outputColumnNames: join 完成之后输出的字段;
      • Statistics: join 完成之后生成的数据条数,大小等;
    1. File Output Operator:文件输出操作,常见的属性:
      • compressed:是否压缩;
      • table:表的信息,包含输入输出文件格式化方式,序列化方式等;
    1. Fetch Operator 客户端获取数据操作,常见的属性:
      • limit,值为 -1 表示不限制条数,其他值为限制的条数

好,学到这里再翻到上面 explain 的查询结果,是不是感觉基本都能看懂了。

⭐3. explain 在生产实践中的案例

1. join 语句会过滤 null 的值吗?

现在,我们在hive cli 输入以下查询计划语句:

select a.id,b.user_name from test1 a join test2 b on a.id=b.id;

问:上面这条 join 语句会过滤 id 为 null 的值吗?

explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id;

我们来看结果 (为了适应页面展示,仅截取了部分输出信息):

TableScan
 alias: a
 Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
 Filter Operator
    predicate: id is not null (type: boolean)
    Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
    Select Operator
        expressions: id (type: int)
        outputColumnNames: _col0
        Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
        HashTable Sink Operator
           keys:
             0 _col0 (type: int)
             1 _col0 (type: int)
 ...

从上述结果可以看到 predicate: id is not null 这样一行,说明 join 时会自动过滤掉关联字段为 null 值的情况,但 left join 或 full join 是不会自动过滤的。

2. group by 分组语句会进行排序吗?

看下面这条sql:

select id,max(user_name) from test1 group by id;

问:group by 分组语句会进行排序吗?

直接来看 explain 之后结果 (为了适应页面展示,仅截取了部分输出信息):

TableScan
    alias: test1
    Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
    Select Operator
        expressions: id (type: int), user_name (type: string)
        outputColumnNames: id, user_name
        Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
        Group By Operator
           aggregations: max(user_name)
           keys: id (type: int)
           mode: hash
           outputColumnNames: _col0, _col1
           Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
           Reduce Output Operator
             key expressions: _col0 (type: int)
             sort order: +
             Map-reduce partition columns: _col0 (type: int)
             Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
             value expressions: _c

3. 哪条sql执行效率高呢?

观察两条sql语句:

SELECT
    a.id,
    b.user_name
FROM
    test1 a
JOIN test2 b ON a.id = b.id
WHERe
    a.id > 2;
SELECt
    a.id,
    b.user_name
FROM
    (SELECt * FROM test1 WHERe id > 2) a
JOIN test2 b ON a.id = b.id;

这两条sql语句输出的结果是一样的,但是哪条sql执行效率高呢?

有人说第一条sql执行效率高,因为第二条sql有子查询,子查询会影响性能 ,有人说第二条sql执行效率高,因为先过滤之后,在进行join时的条数减少了,所以执行效率就高了.

到底哪条sql效率高呢,我们直接在sql语句前面加上 explain,看下执行计划不就知道了!

在第一条sql语句前加上 explain,得到如下结果:

hive (default)> explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id where a.id >2;
OK
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:a
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:a
          TableScan
            alias: a
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int)
                outputColumnNames: _col0
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int), user_name (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)
                  outputColumnNames: _col0, _col2
                  Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col2 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

在第二条sql语句前加上 explain,得到如下结果:

hive (default)> explain select a.id,b.user_name from(select * from  test1 where id>2 ) a join test2 b on a.id=b.id;
OK
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:test1
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:test1
          TableScan
            alias: test1
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int)
                outputColumnNames: _col0
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int), user_name (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)
                  outputColumnNames: _col0, _col2
                  Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col2 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

我们发现,除了表别名不一样,其他的执行计划完全一样,都是先进行 where 条件过滤,在进行 join 条件关联。

说明 hive 底层会自动帮我们进行优化,所以这两条sql语句执行效率是一样的。

以上仅列举了3个我们生产中既熟悉又有点迷糊的例子,explain 还有很多其他的用途,如查看stage的依赖情况、排查数据倾斜、hive 调优等。

使用 explain 查看执行计划是 Hive 性能调优中非常重要的一种方式,请务必掌握!

总结

Hive 对 SQL 语句性能问题排查的方式:

  1. 使用 explain 查看执行计划;
  2. 查看 YARN提供的日志;
Hive调优的方式?⭐

为什么都说性能优化这项工作是比较难的,因为一项技术的优化,必然是一项综合性的工作,它是多门技术的结合。我们如果只局限于一种技术,那么肯定做不好优化的。

下面将文章将从多个完全不同的角度来介绍 Hive 优化的多样性。

1. SQL语句优化

SQL 语句优化涉及到的内容太多,因篇幅有限,不能一一介绍到,所以就拿几个典型举例:

1. union all

insert into table stu partition(tp)
select s_age,max(s_birth) stat,'max' tp
from stu_ori
group by s_age
union all
insert into table stu partition(tp)
select s_age,min(s_birth) stat,'min' tp
from stu_ori
group by s_age

我们简单分析上面的 SQl 语句,就是将每个年龄的最大和最小的生日获取出来放到同一张表中,union all 前后的两个语句都是对同一张表按照 s_age 进行分组,然后分别取最大值和最小值。对同一张表相同的字段进行两次分组,这造成了极大浪费,我们能不能改造下呢,当然是可以的,为大家介绍一个语法:
from …
insert into …

这个语法将 from 前置,作用就是使用一张表,可以进行多次插入操作:

--开启动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
from stu_ori
insert into table stu partition(tp)
select s_age,max(s_birth) stat,'max' tp
group by s_age
insert into table stu partition(tp)
select s_age,min(s_birth) stat,'min' tp
group by s_age;

上面的 SQL 就可以对 stu_ori 表的 s_age 字段分组一次而进行两次不同的插入操作。

这个例子告诉我们一定要多了解 SQL 语句,如果我们不知道这种语法,一定不会想到这种方式的;

2. . distinct

先看一个 SQL,去重计数:

select count(1)
from(
select s_age
from stu
group by s_age
) b;

这是简单统计年龄的枚举值个数,为什么不用 distinct?

select count(distinct s_age)
from stu;

有人说因为在数据量特别大的情况下使用第一种方式能够有效避免 Reduce 端的数据倾斜,但是事实如此吗?

我们先不管数据量特别大这个问题,就当前的业务和环境下使用 distinct 一定会比上面那种子查询的方式效率高。原因有以下几点:

  • 上面进行去重的字段是年龄字段,要知道年龄的枚举值是非常有限的,就算计算 1 岁到 100 岁之间的年龄,s_age 的最大枚举值才是 100,如果转化成 MapReduce 来解释的话,在 Map 阶段,每个 Map 会对 s_age 去重。由于s_age枚举值有限,因而每个Map得到的s_age也有限,最终得到reduce的数据量也就是 map 数量*s_age 枚举值的个数;
  • distinct 的命令会在内存中构建一个 hashtable,查找去重的时间复杂度是 O(1);group by 在不同版本间变动比较大,有的版本会用构建hashtable 的形式去重,有的版本会通过排序的方式, 排序最优时间复杂度无法到 O(1)。另外,第一种方式(group by)去重会转化为两个任务,会消耗更多的磁盘网络 I/O 资源。
  • 最新的 Hive 3.0 中新增了 count(distinct ) 优化,通过配置hive.optimize.countdistinct,即使真的出现数据倾斜也可以自动优化,自动改变 SQL 执行的逻辑。
  • 第二种方式(distinct)比第一种方式(group by)代码简洁,表达的意思简单明了,如果没有特殊的问题,代码简洁就是优;

这个例子告诉我们,有时候我们不要过度优化,调优讲究适时调优,过早进行调优有可能做的是无用功甚至产生负效应,在调优上投入的工作成本和回报不成正比。

调优需要遵循一定的原则。

CPU 时间:表示运行程序所占用服务器 CPU 资源的时间。
用户等待耗时:记录的是用户从提交作业到返回结果期间用户等待的所有时间。
查询 TextFile 类型的数据表耗时 33 分钟, 查询 ORC 类型的表耗时 1 分 52 秒,时
间得以极大缩短,可见不同的数据存储格式也能给 HiveSQL 性能带来极大的影

Hive 提供了多种数据存储组织格式,不同格式对程序的运行效率也会有极大的影响。

Hive 提供的格式有 TEXT、SequenceFile、RCFile、ORC 和 Parquet 等。

SequenceFile 是一个二进制 key/value 对结构的平面文件,在早期的 Hadoop 平台上被广泛用于 MapReduce 输出/输出格式,以及作为数据存储格式。

Parquet 是一种列式数据存储格式,可以兼容多种计算引擎,如 MapRedcue 和Spark 等,对多层嵌套的数据结构提供了良好的性能支持,是目前 Hive 生产环境中数据存储的主流选择之一。

ORC 优化是对 RCFile 的一种优化,它提供了一种高效的方式来存储 Hive 数据,同时也能够提高 Hive 的读取、写入和处理数据的性能,能够兼容多种计算引擎。

事实上,在实际的生产环境中,ORC 已经成为了 Hive 在数据存储上的主流选择之一

我们使用同样数据及 SQL 语句,只是数据存储格式不同,得到如下执行时长:


CPU 时间:表示运行程序所占用服务器 CPU 资源的时间。
用户等待耗时:记录的是用户从提交作业到返回结果期间用户等待的所有时间。

查询 TextFile 类型的数据表耗时 33 分钟, 查询 ORC 类型的表耗时 1 分 52 秒,时间得以极大缩短,可见不同的数据存储格式也能给 HiveSQL 性能带来极大的影响;

3. 小文件过多优化

小文件如果过多,对 hive 来说,在进行查询时,每个小文件都会当成一个块,启动一个 Map 任务来完成,而一个 Map 任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的 Map 数量是受限的。

所以我们有必要对小文件过多进行优化,关于小文件过多的解决的办法有哪些呢?

小文件产生的原因:

hive 中的小文件肯定是向 hive 表中导入数据时产生,所以先看下向 hive 中导入数据的几种方式:

  • 直接向表中插入数据
insert into table A values (1,'zhangsan',88),(2,'lisi',61);

这种方式每次插入时都会产生一个文件,多次插入少量数据就会出现多个小文件,但是这种方式生产环境很少使用,可以说基本没有使用的。

  • 通过load方式加载数据
-- 导入文件
load data local inpath '/export/score.csv' overwrite into table A  

-- 导入文件夹
load data local inpath '/export/score' overwrite into table A   

使用 load 方式可以导入文件或文件夹,当导入一个文件时,hive表就有一个文件,当导入文件夹时,hive表的文件数量为文件夹下所有文件的数量。

  • 通过查询方式加载数据
insert overwrite table A  select s_id,c_name,s_score from B;

这种方式是生产环境中常用的,也是最容易产生小文件的方式。

insert 导入数据时会启动 MR 任务,MR中 reduce 有多少个就输出多少个文件。

所以, 文件数量=ReduceTask数量*分区数;

也有很多简单任务没有reduce,只有map阶段,则
文件数量=MapTask数量*分区数;

每执行一次 insert 时hive中至少产生一个文件,因为 insert 导入时至少会有一个MapTask。

像有的业务需要每10分钟就要把数据同步到 hive 中,这样产生的文件就会很多。

小文件过多产生的影响:

  • 首先对底层存储HDFS来说,HDFS本身就不适合存储大量小文件,小文件过多会导致namenode元数据特别大, 占用太多内存,严重影响HDFS的性能;
  • 对 hive 来说,在进行查询时,每个小文件都会当成一个块,启动一个Map任务来完成,而一个Map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的Map数量是受限的。

怎么解决小文件过多:

  1. 使用 hive 自带的 concatenate 命令,自动合并小文件;

使用方法:

#对于非分区表
alter table A concatenate;

#对于分区表
alter table B partition(day=20201224) concatenate;

举例:

#向 A 表中插入数据
hive (default)> insert into table A values (1,'aa',67),(2,'bb',87);
hive (default)> insert into table A values (3,'cc',67),(4,'dd',87);
hive (default)> insert into table A values (5,'ee',67),(6,'ff',87);

#执行以上三条语句,则A表下就会有三个小文件,在hive命令行执行如下语句
#查看A表下文件数量
hive (default)> dfs -ls /user/hive/warehouse/A;
Found 3 items
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:46 /user/hive/warehouse/A/000000_0
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:47 /user/hive/warehouse/A/000000_0_copy_1
-rwxr-xr-x   3 root supergroup        378 2020-12-24 14:48 /user/hive/warehouse/A/000000_0_copy_2

#可以看到有三个小文件,然后使用 concatenate 进行合并
hive (default)> alter table A concatenate;

#再次查看A表下文件数量
hive (default)> dfs -ls /user/hive/warehouse/A;
Found 1 items
-rwxr-xr-x   3 root supergroup        778 2020-12-24 14:59 /user/hive/warehouse/A/000000_0

#已合并成一个文件

注意:
1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。
2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。
3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size。

  1. 调整参数减少Map数量
  • 设置map输入合并小文件的相关参数
#执行Map前进行小文件合并
#CombineHiveInputFormat底层是 Hadoop的 CombineFileInputFormat 方法
#此方法是在mapper中将多个文件合成一个split作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默认

#每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;   -- 256M

#一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;  -- 100M

#一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;  -- 100M
  • 设置map输出和reduce输出进行合并的相关参数
#设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true;

#设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true;

#设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000;   -- 256M

#当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge
set hive.merge.smallfiles.avgsize=16000000;   -- 16M 
  • 启用压缩
# hive的查询结果输出是否进行压缩
set hive.exec.compress.output=true;

# MapReduce Job的结果输出是否使用压缩
set mapreduce.output.fileoutputformat.compress=true;
  1. 减少Reduce的数量
#reduce 的个数决定了输出的文件的个数,所以可以调整reduce的个数控制hive表的文件数量,
#hive中的分区函数 distribute by 正好是控制MR中partition分区的,
#然后通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可。

#设置reduce的数量有两种方式,第一种是直接设置reduce个数
set mapreduce.job.reduces=10;

#第二种是设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数
set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是1G,设置为5G

#执行以下语句,将数据均衡的分配到reduce中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();

解释:如设置reduce数量为10,则使用 rand(), 随机生成一个数 x % 10 ,
这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小;
  1. 使用hadoop的archive将小文件归档

Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问;

#用来控制归档是否可用
set hive.archive.enabled=true;
#通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
#控制需要归档文件的大小
set har.partfile.size=1099511627776;

#使用以下命令进行归档
ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12');

#对已归档的分区恢复为原文件
ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');

归档的分区可以查看不能 insert overwrite,必须先 unarchive;

如果是新集群,没有历史遗留问题的话,建议hive使用 orc 文件格式,以及启用 lzo 压缩。这样小文件过多可以使用hive自带命令 concatenate 快速合并。

4. 并行执行优化

Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽样阶段、合并阶段、limit 阶段。或者 Hive 执行过程中可能需要的其他阶段。默认情况下,Hive 一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个 job 的执行时间缩短。如果有更多的阶段可以并行执行,那么 job 可能就越快完成。

通过设置参数 hive.exec.parallel 值为 true,就可以开启并发执行。在共享集群中,需要注意下,如果 job 中并行阶段增多,那么集群利用率就会增加。

set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; 
//同一个 sql 允许最大并行度,默认为 8

当然得是在系统资源比较空闲的时候才有优势,否则没资源,并行也起不来。

5. JVM优化

JVM 重用是 Hadoop 调优参数的内容,其对 Hive 的性能具有非常大的影响,特别是对于很难避免小文件的场景或 task 特别多的场景,这类场景大多数执行时间都很短。

Hadoop 的默认配置通常是使用派生 JVM 来执行 map 和 Reduce 任务的。这时 JVM的启动过程可能会造成相当大的开销,尤其是执行的 job 包含有成百上千 task任务的情况。JVM 重用可以使得 JVM 实例在同一个 job 中重新使用 N 次。N 的值可以在 Hadoop 的 mapred-site.xml 文件中进行配置。通常在 10-20 之间,具体多少需要根据具体业务场景测试得出。


mapreduce.job.jvm.numtasks
10
How many tasks to run per jvm. If set to -1, there is
no limit.


我们也可以在 hive 中设置:

set mapred.job.reuse.jvm.num.tasks=10; 
//这个设置来设置我们的 jvm 重用

这个功能的缺点是,开启 JVM 重用将一直占用使用到的 task 插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job 中有某几个 reduce task 执行的时间要比其他 Reduce task 消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的 job 使用,直到所有的 task 都结束了才会释放。

6. 推测执行优化

在分布式集群环境下,因为程序 Bug(包括 Hadoop 本身的 bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。

为了避免这种情况发生,Hadoop 采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。

设置开启推测执行参数:Hadoop 的 mapred-site.xml 文件中进行配置:


mapreduce.map.speculative
true
If true, then multiple instances of some map tasks
may be executed in parallel.


mapreduce.reduce.speculative
true
If true, then multiple instances of some reduce tasks
may be executed in parallel.

hive 本身也提供了配置项来控制 reduce-side 的推测执行:

set hive.mapred.reduce.tasks.speculative.execution=true

关于调优这些推测执行变量,还很难给一个具体的建议。如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的 map 或者 Reduce task 的话,那么启动推测执行造成的浪费是非常巨大的。

总结

代码优化原则:

  • 理透需求原则,这是优化的根本;
  • 把握数据全链路原则,这是优化的脉络;
  • 坚持代码的简洁原则,这让优化更加简单;
  • 没有瓶颈时谈论优化,这是自寻烦恼;
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612378.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号