上周我们分析了org.apache.hadoop.mapreduce.Counters中的的核心代码,本周将继续对Counters部分进行分析。在对Counters类有初步了解的基础上,我们继续分析与Counters相关的源码,并对counters的使用进行实践。
上一周我们对mapreduce核心类counters计数器进行分析,初步探讨了它的作用、构造函数,并通过编写简单的自定义Counters完成了对文件错误记录与全部记录的统计,加深了了解。
同时我们提到 Counter有"组group"的概念,用于表示逻辑上相同范围的所有数值。MapReduce job提供的默认Counter分为三个组:
Map-Reduce frameword
Map input records,Map skipped records,Map input bytes,Map output records,Map output bytes,Combine input records,Combine output records,Reduce input records,Reduce input groups,Reduce output records,Reduce skipped groups,Reduce skipped records,Spilled records
File Systems
FileSystem bytes read,FileSystem bytes written
Job Counters
Launched map tasks,Launched reduce tasks,Failed map tasks,Failed reduce tasks,Data-local map tasks,Rack-local map tasks,Other local map tasks
本次我们将对这三个分组展开具体的分析。
Map-Reduce framework这个Counter group包含了相当多地job执行细节数据。这里需要有个概念认识是:一般情况下,record就表示一行数据,而相对地byte表示这行数据的大小是 多少,这里的group表示经过reduce merge后像这样的输入形式{“aaa”, [5, 8, 2, …]}。
接下来我们对其中涉及到的counters依次进行分析:
Combine input records
Combiner是为了减少尽量减少需要拉取和移动的数据,所以combine输入条数与map的输出条数是一致的。
Combine output records
经过Combiner后,相同key的数据经过压缩,在map端自己解决了很多重复数据,表示最终在map端中间文件中的所有条目数
Failed Shuffles
copy线程在抓取map端中间数据时,如果因为网络连接异常或是IO异常,所引起的shuffle错误次数
GC time elapsed(ms)
通过JMX获取到执行map与reduce的子JVM总共的GC时间消耗
Map input records
所有map task从HDFS读取的文件总行数
Map output records
map task的直接输出record是多少,就是在map方法中调用context.write的次数,也就是未经过Combine时的原生输出条数
Map output bytes
Map的输出结果key/value都会被序列化到内存缓冲区中,所以这里的bytes指序列化后的最终字节之和
Merged Map outputs
记录着shuffle过程中总共经历了多少次merge动作
Reduce input groups
Reduce总共读取了多少个这样的groups
Reduce input records
如果有Combiner的话,那么这里的数值就等于map端Combiner运算后的最后条数,如果没有,那么就应该等于map的输出条数
Reduce output records
所有reduce执行后输出的总条目数
Reduce shuffle bytes
Reduce端的copy线程总共从map端抓取了多少的中间数据,表示各个map task最终的中间文件总和
Shuffled Maps
每个reduce几乎都得从所有map端拉取数据,每个copy线程拉取成功一个map的数据,那么增1,所以它的总数基本等于 reduce number * map number
Spilled Records
spill过程在map和reduce端都会发生,这里统计在总共从内存往磁盘中spill了多少条数据
SPLIT_RAW_BYTES
与map task 的split相关的数据都会保存于HDFS中,而在保存时元数据也相应地存储着数据是以怎样的压缩方式放入的,它的具体类型是什么,这些额外的数据是 MapReduce框架加入的,与job无关,这里记录的大小就是表示额外信息的字节大小
File Systems
MapReduce job执行所依赖的数据来自于不同的文件系统,这个group表示job与文件系统交互的读写统计。
FILE_BYTES_READ
job读取本地文件系统的文件字节数。假定我们当前map的输入数据都来自于HDFS,那么在map阶段,这个数据应该是0。但reduce在执行前,它 的输入数据是经过shuffle的merge后存储在reduce端本地磁盘中,所以这个数据就是所有reduce的总输入字节数。
FILE_BYTES_WRITTEN
map的中间结果都会spill到本地磁盘中,在map执行完后,形成最终的spill文件。所以map端这里的数据就表示map task往本地磁盘中总共写了多少字节。与map端相对应的是,reduce端在shuffle时,会不断地拉取map端的中间结果,然后做merge并 不断spill到自己的本地磁盘中。最终形成一个单独文件,这个文件就是reduce的输入文件。
HDFS_BYTES_READ
整个job执行过程中,只有map端运行时,才从HDFS读取数据,这些数据不限于源文件内容,还包括所有map的split元数据。所以这个值应该比FileInputFormatCounters.BYTES_READ 要略大些。
HDFS_BYTES_WRITTEN
Reduce的最终结果都会写入HDFS,就是一个job执行结果的总量。
Job Counters
这个group描述与job调度相关的统计。
Data-local map tasks
Job在被调度时,如果启动了一个data-local(源文件的幅本在执行map task的taskTracker本地)
FALLOW_SLOTS_MILLIS_MAPS
当前job为某些map task的执行保留了slot,总共保留的时间是多少
FALLOW_SLOTS_MILLIS_REDUCES
与上面类似
SLOTS_MILLIS_MAPS
所有map task占用slot的总时间,包含执行时间和创建/销毁子JVM的时间
SLOTS_MILLIS_REDUCES
与上面类似
Launched map tasks
此job启动了多少个map task
Launched reduce tasks
此job启动了多少个reduce task
上面我们分析了MapReduce job提供的默认Counter的三个分组。
下面我们结合具体的例子进一步分析。
源码中已经给我们提供了一个CounterGroup。
我们以源码中org.apache.hadoop.mapreduce.TaskCounter这个CounterGroup为例,源码如下:
package org.apache.hadoop.mapreduce;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
// Counters used by Task classes
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum TaskCounter {
MAP_INPUT_RECORDS,
MAP_OUTPUT_RECORDS,
MAP_SKIPPED_RECORDS,
MAP_OUTPUT_BYTES,
MAP_OUTPUT_MATERIALIZED_BYTES,
SPLIT_RAW_BYTES,
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
REDUCE_SHUFFLE_BYTES,
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS,
REDUCE_SKIPPED_GROUPS,
REDUCE_SKIPPED_RECORDS,
SPILLED_RECORDS,
SHUFFLED_MAPS,
FAILED_SHUFFLE,
MERGED_MAP_OUTPUTS,
GC_TIME_MILLIS,
CPU_MILLISECONDS,
PHYSICAL_MEMORY_BYTES,
VIRTUAL_MEMORY_BYTES,
COMMITTED_HEAP_BYTES,
MAP_PHYSICAL_MEMORY_BYTES_MAX,
MAP_VIRTUAL_MEMORY_BYTES_MAX,
REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
}
TaskCounter.java所在源码目录下有一个对应的TaskCounter.properties文件 ,该properties文件是用来为TaskCounter这个CounterGroup做资源绑定的,简单来说就是做显示用的。
如下方代码就是TaskCounter.properties文件的内容,和web页面显示的是一样的。
# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ResourceBundle properties file for Map-Reduce counters CounterGroupName= Map-Reduce framework MAP_INPUT_RECORDS.name= Map input records MAP_OUTPUT_RECORDS.name= Map output records MAP_OUTPUT_BYTES.name= Map output bytes MAP_OUTPUT_MATERIALIZED_BYTES.name= Map output materialized bytes MAP_SKIPPED_RECORDS.name= Map skipped records COMBINE_INPUT_RECORDS.name= Combine input records COMBINE_OUTPUT_RECORDS.name= Combine output records REDUCE_INPUT_GROUPS.name= Reduce input groups REDUCE_SHUFFLE_BYTES.name= Reduce shuffle bytes REDUCE_INPUT_RECORDS.name= Reduce input records REDUCE_OUTPUT_RECORDS.name= Reduce output records REDUCE_SKIPPED_RECORDS.name= Reduce skipped records REDUCE_SKIPPED_GROUPS.name= Reduce skipped groups SPLIT_RAW_BYTES.name= Input split bytes SPILLED_RECORDS.name= Spilled Records SHUFFLED_MAPS.name= Shuffled Maps FAILED_SHUFFLE.name= Failed Shuffles MERGED_MAP_OUTPUTS.name= Merged Map outputs GC_TIME_MILLIS.name= GC time elapsed (ms) COMMITTED_HEAP_BYTES.name= Total committed heap usage (bytes) CPU_MILLISECONDS.name= CPU time spent (ms) PHYSICAL_MEMORY_BYTES.name= Physical memory (bytes) snapshot VIRTUAL_MEMORY_BYTES.name= Virtual memory (bytes) snapshot MAP_PHYSICAL_MEMORY_BYTES_MAX.name= Peak Map Physical memory (bytes) MAP_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Map Virtual memory (bytes) REDUCE_PHYSICAL_MEMORY_BYTES_MAX.name=Peak Reduce Physical memory (bytes) REDUCE_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Reduce Virtual memory (bytes)
properties文件的命名以enum文件名一样,CounterGroupName就是该CounterGroup在页面上显示的CounterGroup名字。
每个具体的enum如何绑定显示名字呢?以MAP_INPUT_RECORDS为例,MAP_INPUT_RECORDS.name=Map input records就能改变其对应的显示名字。
新建SortCounter.java枚举类,内容如下:
SortCounter.properties文件,内容如下:
运行MR任务后会看到如下信息,在ResourceManager的web页面也能查看:
我们可以在map方法中使用我们自定义的counters:
本次我们首先分析了Counters类的默认三个分组—Map-Reduce frameword、File Systems、Job Counters。然后我们在分析了源码自带的counters实例的基础上,自定义了counters实例,并在map方法中使用,加深了对counters计数器类的了解。



