Hadoop 中的HDFS和MapReduce都是针对大文件来设计的。在HDFS中,DataNode在启动的时候需要每个block 的信息都要上报给NameNode,每个block 的信息占用150字节,如果我们存储了一堆都是几KB的小文件,最后发现NameNode的内存占满了,确实存储了很多文件,但是文件的总体大小却很小,这样就失去了HDFS存在的价值。在MapReduce中每个block 需要启动一个Map任务,启动Map任务很耗资源,但是由于数据量小执行map任务时间却很短,这样就会造成任务执行消耗的时间还没有启动任务消耗的时间多,这样也会影响MapReduce执行的效率。所以通常的做法是使用容器压缩小文件。
4.1.2 常见的容器概念
SequenceFile
SequenceFile是Hadoop 提供的一种二进制文件,这种这种二进制文件直接将
MapFile
MapFile是排序后的SequenceFile,MapFile由两部分组成,分别是index和data
index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。
数据准备
private static void genderate_smallFile(){
String fileDir="E:\smallfile";
File file = new File(fileDir);
if(!file.exists()){
file.mkdir();
}
int i=1;
while(i<51){
String fielAbsolutePath = fileDir+File.separator+"file"+i+".txt";
try(FileWriter fw = new FileWriter(fielAbsolutePath);
BufferedWriter bfw = new BufferedWriter(fw);
){
bfw.write("hello world");
i++;
}catch (Exception e){
e.printStackTrace();
}
}
}
SequenceFile实现小文件存储代码实现
package com.imooc.mr;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.File;
public class SmallFileSeq {
public static void main(String[] args) throws Exception{
//生成SequenceFile文件
write("E:\smallfile","/seqFile");
// 读取SequenceFile 文件
read("/seqFile");
}
private static void write(String inputDir,String outputFile) throws Exception{
// 创建配置对象
Configuration conf = new Configuration();
// 指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
// 获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
// 删除HDFS 上的输出文件 保证函数可以重复执行
fileSystem.delete(new Path(outputFile),true);
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
SequenceFile.Writer.file(new Path(outputFile)),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class)
};
// 创建一个writer实例
SequenceFile.Writer writer = SequenceFile.createWriter(conf,opts);
// 指定需要压缩的文件的目录
File inputDirPath = new File(inputDir);
if(inputDirPath.isDirectory()){
File[] files = inputDirPath.listFiles();
// 迭代文件
for(File file:files){
// 获取文件的全部内容
String content = FileUtils.readFileToString(file,"UTF-8");
// 获取文件名
String fileName = file.getName();
// 获取 key value key 为文件名 value 为文件内容
Text key = new Text(fileName);
Text value = new Text(content);
// 向SequenceFile中写入数据
writer.append(key,value);
}
}
writer.close();
}
private static void read(String inputFile) throws Exception{
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
// 创建阅读器
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
Text key = new Text();
Text value = new Text();
// 循环读取数据
while(reader.next(key,value)){
//输出文件名称
System.out.print("文件名:"+key.toString()+",");
//输出文件内容
System.out.println("文件内容:"+value.toString()+"");
}
reader.close();
}
}
MapFile实现小文件存储代码实现
package com.imooc.mr;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.File;
public class SmallFileMap {
public static void main(String[] args) throws Exception{
//生成MapFile文件
write("E:\smallfile","/mapFile");
// 读取MapFile 文件
read("/mapFile");
}
private static void write(String inputDir,String outputDir) throws Exception{
// 创建配置对象
Configuration conf = new Configuration();
// 指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
// 获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
// 删除HDFS 上的输出文件 保证函数可以重复执行
fileSystem.delete(new Path(outputDir),true);
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
MapFile.Writer.keyClass(Text.class),
MapFile.Writer.valueClass(Text.class)
};
// 创建一个writer实例
MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputDir), opts);
// 指定需要压缩的文件的目录
File inputDirPath = new File(inputDir);
if(inputDirPath.isDirectory()){
File[] files = inputDirPath.listFiles();
// 迭代文件
for(File file:files){
// 获取文件的全部内容
String content = FileUtils.readFileToString(file,"UTF-8");
// 获取文件名
String fileName = file.getName();
// 获取 key value key 为文件名 value 为文件内容
Text key = new Text(fileName);
Text value = new Text(content);
// 向SequenceFile中写入数据
writer.append(key,value);
}
}
writer.close();
}
private static void read(String inputDir) throws Exception{
// 创建一个配置对象
Configuration conf = new Configuration();
// 指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
// 创建阅读器
MapFile.Reader reader = new MapFile.Reader(new Path(inputDir),conf);
Text key = new Text();
Text value = new Text();
// 循环读取数据
while(reader.next(key,value)){
//输出文件名称
System.out.print("文件名:"+key.toString()+",");
//输出文件内容
System.out.println("文件内容:"+value.toString()+"");
}
reader.close();
}
}
SequenceFile实现小文件计算
package com.imooc.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountJobSeq {
public static class MyMapper extends Mapper{
// 使用Logger进行日志输出
//Logger logger = LoggerFactory.getLogger(MyMapper.class);
@Override
protected void map(Text k1,Text v1,Context context) throws IOException, InterruptedException {
// 输出 k1 ,v1 的值
//System.out.println("=<"+k1.toString()+","+v1.toString()+">");
//使用log 输出
// logger.info("=<"+k1.get()+","+v1.toString()+">");
String[] words = v1.toString().split(" ");
// 迭代切割出来的单词数据
for(String word:words){
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//System.out.println("");
//logger.info("");
context.write(k2,v2);
}
}
}
public static class MyReduce extends Reducer{
//Logger logger = LoggerFactory.getLogger(MyReduce.class);
@Override
protected void reduce(Text k2, Iterable v2s, Context context) throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的值
long sum = 0L;
for(LongWritable v2:v2s){
sum +=v2.get();
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 输出k3,v3的值
//System.out.println("=<"+k3.toString()+","+v3.get()+">");
// 使用Logger 进行日志输出
//logger.info("=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
public static void main(String[] args) {
try {
if(args.length!=2){
System.exit(100);
}
// 指定Job需要配置的参数
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf);
//
job.setJarByClass(WordCountJobSeq.class);
//指定输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 指定输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 指定map 相关的代码
job.setMapperClass(MyMapper.class);
// 指定 K2的类型
job.setMapOutputKeyClass(Text.class);
// 指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
// 设置输出数据处理类,生成K1,V1的FileInputFormat 处理子类,默认是TextInputFormat
job.setInputFormatClass(SequenceFileInputFormat.class);
// 指定reduce相关的代码
job.setReducerClass(MyReduce.class);
// 指定K3的类型
job.setMapOutputKeyClass(Text.class);
// 指定v3的类型
job.setOutputValueClass(LongWritable.class);
// 提交Job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
}
4.2 解决数据倾斜SequenceFile编译执行 hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobSeq /seqFile /out50,MapFile 编译执行命令 hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobSeq /mapFile/data /out60 MapFile生成的是一个目录文件,里面含有索引和数据,选取数据计算就是MapFile的计算代码实现(代码和SequenceFile计算代码一致)
数据倾斜产生原因
数据分布不均匀,导致某一个或某几个reduce 任务耗时过长。解决数据倾斜实际上是解决资源的平均利用。提高MapReduce的执行效率。(有点类似木桶原理)
源码分析
package org.apache.hadoop.mapreduce.lib.partition; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Partitioner; @InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitionerextends Partitioner { public int getPartition(K key, V value, int numReduceTasks) { // 任何值对1进行取模运算得到的结果是0,所以要增加分区,需要改变reduce 任务数,但是如果数值分布不均,会导致大量相同的数据在一个分区进行运算 //这就产生了数据倾斜,所以解决数据倾斜的方法很简单,将数据打散,重新分区,然后对结果在进行一次Reduce运算。 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } // 第一步 job.setNumReduceTasks() public void setNumReduceTasks(int tasks) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setNumReduceTasks(tasks); } // 第二步 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } // 第三步 public static final String NUM_REDUCES = "mapreduce.job.reduces"; // 第四步 可以看到 numReduceTask 默认值是1 在源码 mapred-default.xml 中有设置默认值 mapreduce.job.reduces 1 The default number of reduce tasks per job. Typically set to 99% of the cluster's reduce capacity, so that if a node fails the reduces can still be executed in a single wave. Ignored when mapreduce.framework.name is "local".
代码实现
数据准备
准备数据倾斜数据 5–900w,1-4 6-10 100W,生成共1000W条数据。
private static void genderate_1000w(){
Random random =new Random();
String fileName ="E:\test_1000W.txt";
System.out.println("start: 开始生成1000W条数据->"+fileName);
try(FileWriter fileWriter = new FileWriter(fileName);
BufferedWriter bfw =new BufferedWriter(fileWriter)){
int num=0;
while(num<10000000){
int i = random.nextInt(10)+1;
if(num<=999999){
if(i!=5){
// 加一串字符只是为了将文件撑大,使效果明显
bfw.write(Integer.toString(i).concat(" bkadjfkfjkdfhksajdkfhdkfassdfdfdjbkkkkkkkkksaasd"));
bfw.newline();
num++;
}
}else{
bfw.write("5"+" bkadjfkfjkdfhksajdkfhdkfassdfdfdjbkkkkkkkkksaasd");
bfw.newline();
num++;
}
}
}catch (Exception e){
e.printStackTrace();
}
}
数据打散
package com.imooc.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Random;
public class WordCountJobSkewRandKey {
public static class MyMapper extends Mapper{
// 使用Logger进行日志输出
//Logger logger = LoggerFactory.getLogger(MyMapper.class);
Random random = new Random();
@Override
protected void map(LongWritable k1,Text v1,Context context) throws IOException, InterruptedException {
// 输出 k1 ,v1 的值
//System.out.println("=<"+k1.get()+","+v1.toString()+">");
//使用log 输出
// logger.info("=<"+k1.get()+","+v1.toString()+">");
String[] words = v1.toString().split(" ");
// 切割出来的数据我们只需要words[0]就可以了,主要是统计数字出现的次数
String key = words[0];
if("5".equals(key)){
// 将key 为5的数据打散 生成 数据 5_1 5_2...
int i = random.nextInt(10)+1;
key = "5"+"_"+i;
}
Text k2 = new Text(key);
LongWritable v2 = new LongWritable(1L);
// 把写出
context.write(k2,v2);
}
}
public static class MyReduce extends Reducer{
//Logger logger = LoggerFactory.getLogger(MyReduce.class);
@Override
protected void reduce(Text k2, Iterable v2s, Context context) throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的值
long sum = 0L;
for(LongWritable v2:v2s){
sum +=v2.get();
//模拟Reduce的复杂计算消耗的时间
if(sum%200 == 0){
Thread.sleep(1);
}
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 输出k3,v3的值
//System.out.println("=<"+k3.toString()+","+v3.get()+">");
// 使用Logger 进行日志输出
//logger.info("=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
public static void main(String[] args) {
try {
if(args.length!=3){
System.exit(100);
}
// 指定Job需要配置的参数
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJobSkew这个类的
job.setJarByClass(WordCountJobSkewRandKey.class);
//指定输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 指定输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 指定map 相关的代码
job.setMapperClass(MyMapper.class);
// 指定 K2的类型
job.setMapOutputKeyClass(Text.class);
// 指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
// 指定reduce相关的代码
job.setReducerClass(MyReduce.class);
// 指定K3的类型
job.setMapOutputKeyClass(Text.class);
// 指定v3的类型
job.setOutputValueClass(LongWritable.class);
// 设置Reducer任务个数,第三个参数为输入的Reduce个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
// 提交Job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
}
再次聚合
package com.imooc.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
public class WordCountJobSkewRandKeyReduce {
public static class MyMapper extends Mapper{
// 使用Logger进行日志输出
//Logger logger = LoggerFactory.getLogger(MyMapper.class);
Random random = new Random();
@Override
protected void map(LongWritable k1,Text v1,Context context) throws IOException, InterruptedException {
// 输出 k1 ,v1 的值
//System.out.println("=<"+k1.get()+","+v1.toString()+">");
//使用log 输出
// logger.info("=<"+k1.get()+","+v1.toString()+">");
// 以制表符作为切割
String[] words = v1.toString().split("t");
// 切割出来的数据我们只需要words[0]就可以了,主要是统计数字出现的次数
// 截取需要的数字
String key = words[0].split("_")[0];
Text k2 = new Text(key);
//System.out.println("k2 ="+k2.toString()+"v2="+words[1]);
LongWritable v2 = new LongWritable((Long.parseLong(words[1])));
// 把写出
context.write(k2,v2);
}
}
public static class MyReduce extends Reducer{
//Logger logger = LoggerFactory.getLogger(MyReduce.class);
@Override
protected void reduce(Text k2, Iterable v2s, Context context) throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的值
long sum = 0L;
for(LongWritable v2:v2s){
sum +=v2.get();
//模拟Reduce的复杂计算消耗的时间
if(sum%200 == 0){
Thread.sleep(1);
}
}
// 组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
// 输出k3,v3的值
//System.out.println("=<"+k3.toString()+","+v3.get()+">");
// 使用Logger 进行日志输出
//logger.info("=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
public static void main(String[] args) {
try {
if(args.length!=3){
System.exit(100);
}
// 指定Job需要配置的参数
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJobSkewRandKeyReduce这个类的
job.setJarByClass(WordCountJobSkewRandKeyReduce.class);
//指定输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 指定输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 指定map 相关的代码
job.setMapperClass(MyMapper.class);
// 指定 K2的类型
job.setMapOutputKeyClass(Text.class);
// 指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
// 指定reduce相关的代码
job.setReducerClass(MyReduce.class);
// 指定K3的类型
job.setMapOutputKeyClass(Text.class);
// 指定v3的类型
job.setOutputValueClass(LongWritable.class);
// 设置Reducer任务个数,第三个参数为输入的Reduce个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
//获取分区信息
//job.getPartitionerClass();
// 提交Job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
}
4.3 MapReduce性能优化总结在这个地方还可以使用sequenceFile进行Map阶段优化,这样可以优化Map效率。
MapReduce总共分为Map阶段和Reduce阶段,所以优化也是从这两个方面入手。
Map
小文件使用SequenceFile或这MapFile进行压缩,减少Map频繁启动,对小文件来说Map进程启动时间比计算时间耗时长。Map任务数由InputSplit决定,
InputSplit个数又和block个数相关,一般增加不了Map任务数。
Reduce
增加Reduce 任务数,针对数据倾斜不严重的数据有显著提升作用。对数据倾斜严重数据,参考解决数据倾斜方案,并增加Reduce 任务数。
仔细回想MapReduce 性能优化和平时开发中遇到的问题,发现还真是这样。计算机科学领域的任何问题都可以通过增加一个间接的中间层来解决。——David Wheeler



