栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

大数据学习之MapReduce

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

大数据学习之MapReduce

MapReduce概述

MapReduce是一种分布式并行编程框架,借助一个集群通过多台机器同时并行处理大规模数据集

  • MapReduce的策略
    MapReduce采用分而治之的策略。把庞大的数据集,切分成非常多的独立的小分片,然后为每个分片单独地启动一个map任务,最终通过多个map任务,并行地在多个机器上处理
  • MapReduce的理念
  1. 计算向数据靠拢而不是数据向计算靠拢
  2. 要完成一次数据分析时,选择一个计算节点,把运行数据分析的程序放到计算节点上运行
  3. 然后把它所涉及的数据,全部从各个不同的节点上面拉过来,传输到计算发生的地方


    MapReduce将复杂的集群计算过程,高度的抽象成Map函数和Reduce函数
函数输入输出说明
Map1,v2>
如:<行号,”a b c”>
List(2,v2>)
如:<“a”,1>
<“b”,1>
<“c”,1>
1.将小数据集进一步解析成一批对,输入Map函数中进行处理
2.每一个输入的1,v1>会输出一批2,v2>。其中2,v2>是计算的中间结果
Reduce2,List(v2)>
如:<“a”,<1,1,1>>
3,v~3>
<“a”,3>
输入的中间结果2,List(v2)>中的List(v2)表示是一批属于同一个k2的value
MapReduce 体系结构

  1. Client 客户端
    用户编写的MapReduce程序通过Client提交到JobTracker端
    用户可通过Client提供的一些接口查看作业运行状态
  2. JobTracker 作业追踪器
    负责资源的监控和作业的调度
    监控底层的其它TaskTracker以及当前运行的Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点
    跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源
  3. TaskTracker 任务调度器
    接收JobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)
    TaskTracker会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker
    TaskTracker如何衡量自己资源使用情况?
    使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task获取到一个slot后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Map slot和Reduce slot两种,分别供MapTask和Reduce Task使用,两种slot不通用。
  4. Task 任务
    Task分为Map Task和Reduce Task两种,均由TaskTracker启动
MapReduce的工作流程

  • 不同的Map任务之间不会进行通信
  • 不同的Reduce任务之间也不会发生任何信息交换
  • 用户不能显式地从一台机器向另一台机器发送消息
  • 所有的数据交换都是通过MapReduce框架自身去实现的
MapReduce执行的各个阶段

  1. 首先从分布式系统HDFS上通过InputFormat模块加载读取文件,InputFormat负责对输入进行验证,并且把大的数据进行逻辑上的切分Split,Split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。

    Hadoop为每个split创建一个Map任务,split的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块,防治一个分片横跨多个block上,而block不在同一台机器上
    Reduce任务的数量
    最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
    通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)

  2. 再由记录阅读器RR根据分片的位置和长度信息,从HDFS中把相关分片读取出来,输出为形式

  3. Map函数只接收形式的输入,Map函数中是用户定义的处理逻辑,完成相关的数据处理,生产中间结果一堆的

  4. 中间结果经过Shuffle(分区、排序、合并、归并)过程,才能把相关的键值对分发给相对应的Reduce任务来处理

  • Map端的Shuffle过程
    收到
    每个Map任务分配一个缓存
    MapReduce默认100MB缓存
    设置溢写比例0.8
    分区默认采用哈希函数
    排序是默认的操作
    排序后可以合并(Combine)
    合并不能改变最终结果
    在Map任务全部结束之前进行归并
    归并得到一个大的文件,放在本地磁盘
    文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要
    JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据
    合并(Combine)和归并(Merge)的区别:两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
  • Reduce端的Shuffle过程

    Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据
    Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘
    多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
    当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce
  1. Reduce任务接收到之后,处理用户撰写的Reduce函数中的处理逻辑,完成对数据的分析,最终结果的输出格式为
  2. 最终结果经过OutputFormat模块,对输出格式进行检查,以及输出相关目录是否存在
  3. 最后写入到分布式文件系统HDFS中
MapReduce应用程序的执行过程

  1. 程序部署
  2. 执行Map任务
  3. 读数据,到缓存
  4. 本地(磁盘)写数据
  5. 远程读数据
  6. 写输出HDFS文件
MapReduce实例分析:WordCount 1. WordCount程序任务
程序WordCount
输入一个包含大量单词的文本文件
输出文件中每个单词及其出现次数(频数),并按照单词字母顺序排序
每个单词和其频数占一行,单词和频数之间有间隔
输入输出
Hello World
Hello Hadoop
Hello MapReduce
Hadoop , 1
Hello , 3
MapReduce ,1
World , 1

不是所有的任务都可以使用MapReduce进行处理,只有满足分而治之的任务才可以

2.WordCount设计思路

首先,需要检查WordCount程序任务是否可以采用MapReduce来实现
其次,确定MapReduce程序的设计思路
最后,确定MapReduce程序的执行过程

3. WordCount执行过程


MapReduce 的具体应用

MapReduce可以很好地应用于各种计算问题

  • 关系代数运算(选择、投影、并、交、差、连接)
  • 分组与聚合运算
  • 矩阵-向量乘法
  • 矩阵乘法
用MapReduce实现自然连接

假设有关系R(A,B)和S(B,C),对二者进行自然连接操作
使用Map过程,把来自R的每个元组转换成一个键值对>,其中的键就是属性B的值。把关系R包含到值中,这样做使得我们可以在Reduce阶段,只把那些来自R的元组和来自S的元组进行匹配。类似地,使用Map过程,把来自S的每个元组,转换成一个键值对>
所有具有相同B值的元组被发送到同一个Reduce进程中,Reduce进程的任务是,把来自关系R和S的、具有相同属性B值的元组进行合并
Reduce进程的输出则是连接后的元组,输出被写到一个单独的输出文件中

MapReduce编程实践 1.明确任务
  • 文件A中的内容:
China is my motherland
I love China
  • 文件B中的内容:
I am from China
  • 期望结果:
I					2
is					1
China				3
my					1
love				1
am					1
from				1
motherland			1
2.编写Map处理逻辑
  • Map输入类型为
  • 期望的Map输出类型为<单词,出现次数>
  • Map输入类型最终确定为
  • Map输出类型最终确定为
// 继承Mapper类;Map输入类型为Object,Text;Map输出类型为Text,IntWritable
public static class MyMapper extends Mapper{
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();
	// 覆盖map方法
	public void map(Object key, Text value, Context context) throws
IOException,InterruptedException{
		// 将英文句子拆分成单词,再把单词转换成(单词,1)的形式
		StringTokenizer itr = new StringTokenizer(value.toString());
		while (itr.hasMoreTokens()){  //对单词进行遍历
			word.set(itr.nextToken());
			context.write(word,one);
		}
	}
}
3.编写Reduce处理逻辑

在Reduce处理数据之前,Map的结果首先通过Shuffle阶段进行整理
Reduce阶段的任务:对输入数字序列进行求和
Reduce的输入数据为
Reduce任务的输入数据:
<”I”,<1,1>>
<”is”,1>
……
<”from”,1>
<”China”,<1,1,1>>

// 继承自Reduce类;
public static class MyReducer extends Reducer{
	// 词频汇总结果
	private IntWritable result = new IntWritable();
	// 覆盖reduce方法
	public void reduce(Text key, Iterable values, Context context) throws IOException,InterruptedException{
		// 求和 初始值
		int sum = 0;
		// 对传进来的value(单词,1)进行变量
		for (IntWritable val : values){
			// 单词出现次数的累加
			sum += val.get();
		}
		result.set(sum);
		context.write(key,result);
	}
}
4.编写main方法
public static void main(String[] args) throws Exception{
	Configuration conf = new Configuration(); //程序运行时参数
	String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
	if (otherArgs.length != 2){
		System.err.println("Usage: wordcount  ");
		System.exit(2);
	}
	Job job = new Job(conf,"word count"); //设置环境参数
	job.setJarByClass(WordCount.class); //设置整个程序的类名
	job.setMapperClass(MyMapper.class); //添加MyMapper类
	job.setReducerClass(MyReducer.class); //添加MyReducer类
	job.setOutputKeyClass(Text.class); //设置输出类型
	job.setOutputValueClass(IntWritable.class); //设置输出类型
	FileInputFormat.addInputPath(job,new Path(otherArgs[0])); //设置输入文件
	FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //设置输出文件
	System.exit(job.waitForCompletion(true)?0:1);
}
5. 完整代码
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount{
	public static class MyMapper extends Mapper{
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
		StringTokenizer itr = new StringTokenizer(value.toString());
		while (itr.hasMoreTokens()){
			word.set(itr.nextToken());
			context.write(word,one);
		}
	}
}
public static class MyReducer extends Reducer{
	private IntWritable result = new IntWritable();
	public void reduce(Text key, Iterable values, Context context) throws IOException,InterruptedException{
		int sum = 0;
		for (IntWritable val : values){
			sum += val.get();
		}
		result.set(sum);
		context.write(key,result);
	}
}

public static void main(String[] args) throws Exception{
	Configuration conf = new Configuration();
	String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
	if (otherArgs.length != 2){
		System.err.println("Usage: wordcount  ");
		System.exit(2);
	}
	Job job = new Job(conf,"word count");
	job.setJarByClass(WordCount.class);
	job.setMapperClass(MyMapper.class);
	job.setReducerClass(MyReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
	FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
	System.exit(job.waitForCompletion(true)?0:1);
	}
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677689.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号