- 一、关于 Hadoop 的 MapReduce
- 二、关于 Map task 、Reduce task 和 Shuffle
- 1、map 任务处理
- 2、reduce 任务处理
- 3、shuffle
- 三、关于 MapReduce 的使用
- 1、使用测试样例
- 2、开始客制化 Map 和 Reduce
- 2.1 单词统计
- 2.2 单词统计 ver 2.0 预聚合
- 2.3 按照班级统计年龄和
- 2.4 筛选女性(只使用 map ,关闭 reduce)
- 2.5 关于开启两个 reduce
- 2.6 两表联查(Reduce Join)
- 2.7 两表联查(Map Join)
- 四、 yarn
- 一个 MapReduce 获取资源的过程
- 总结
I know, i know
地球另一端有你陪我
一、关于 Hadoop 的 MapReduce
MapReduce 是一种分布式计算模型,主要用于搜索领域,解决海量数据的计算问题.
MapReduce 是分布式运行的,由两个阶段组成:Map 和 Reduce
简单理解就是将数据分组处理后,再聚合
MapReduce 框架都有默认实现,用户只需要覆盖map()和reduce()两个函数,即可实现
这两个函数的形参和返回值都是
二、关于 Map task 、Reduce task 和 Shuffle 1、map 任务处理
1 将原本 HDFS 中的数据进行切片(InputSplit),大小默认 128MB ,同 Block 大小相同
其中一个切片对应一个 map task,并将内部的数据解析成一个个
默认,k1 是一行的偏移量,v1 对应一行的数据
2 框架调用 Mapper 类中的 map() 方法,做进一步处理,输出依然是
可以通过继承 Mapper 类,重写 map() 方法,实现客制化
3.1 当 reduce 不存在
框架对 map 结果直接输出到 HDFS 中,结束 MapReduce
3.2 当 reduce 存在
框架对 map 输出的
注:默认只有1个分区,且并不是随机分配,而是根据值取余计算
4 框架对分区中乱序的
5 (可选)预聚合(combine),对不同切片造成多个 map task 内,
多个有序分区内的数据进行预处理,如提前加和等
6 上述完成后,会将数据第一次写入 linux 的磁盘文件中,此为第一次数据落地
7 最后根据 k 的值,将这些有序
至此,整个 map 阶段结束
1 框架对多个 map task 的输出,按照 k 的值进行接收
2 过程中,框架会将具有相同 k 的
期间会将数据再次写入到 linux 的磁盘文件中,此为第二次数据落地
3 框架调用 Reducer 类中的 reduce() 方法,接收的形参即上述 k,{v,v,v…} 格式
可以通过继承 Reducer 类,重写 reduce() 方法,实现客制化
4 框架把 reduce 的输出结果保存到 HDFS 中
至此,整个 reduce 阶段结束
它是包含了 map 和 reduce 两部分的一系列操作,可以看做是一种交集
map() 方法调用之后到 reduce() 方法调用之前,即为 shuffle
关于第一次数据落地补充
1 每个 map task 有一个环形内存缓冲区,用于存储 map 的输出
默认大小100MB(io.sort.mb属性),一旦达到阈值0.8(io.sort.spill.percent)
一个后台线程把内容溢写到(spill)磁盘的指定目录(mapred.local.dir)下的一个新建文件中
2 写磁盘前,要进行排序、分组(partition、sort),如果有combiner,写入 combine 后数据
3.等最后记录写完,合并全部文件为一个分区且排序的文件
三、关于 MapReduce 的使用 1、使用测试样例
通用的 MapReduece 里包含一套单词统计方法用作测试,目录位于
cd /usr/local/soft/hadoop-2.7.6/share/hadoop/mapreduce/
在 Linux 中调用 MapReduce 方法
hadoop jar 使用的jar包 从java路径开始,指名所使用类名 hdfs输入文件路径 hdfs输出文件路径
注:输出文件路径必须不存在,否则无法运行
hadoop jar hadoop-mapreduce-examples-2.7.6.jar wordcount /words.txt /output
2、开始客制化 Map 和 Reduce
使用者可以通过继承 Mapper、Reducer 类,重写 map() 、reduce() 方法,实现客制化
这里从最基础的单词统计开始
package day43;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount01 {
public static class WordMapper
extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//先把一行数据取下来
String line = value.toString();
//挨个发送
context.write(value,new LongWritable(1));
//map方法会总动对每一行都做出操作,所以遍历交给她就好了
}
}
public static class WordReducer
extends Reducer{
@Override
protected void reduce(Text key, Iterable values
, Context context) throws IOException, InterruptedException {
//
//通过迭代器聚合每一个1
long count=0;
for (LongWritable value : values) {
//转换一下数据类型为long
count += value.get();
}
//挨个抬走
context.write(key,new LongWritable(count));
//同样,reduce也会对每一个操作,遍历也不需要担心
}
}
public static void main(String[] args) throws Exception{
// 配置mapreduce
Job job = Job.getInstance();
//赋予job名字
job.setJobName("第二个MR,单词统计考虑一行多个,预聚合");
//反射一堆应用类
//jar包里的当前使用类
job.setJarByClass(WordCount01.class);
//Map端所在类的位置
job.setMapperClass(WordMapper.class);
//指定map端kv的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//reduce端所在类的位置
job.setReducerClass(WordReducer.class);
//指定reduce端kv的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定Hadoop路径
Path input = new Path("/words.txt");
//输出路径不能已存在
Path out = new Path("/output");
//提前加载路径,注意类的区分
//org.apache.hadoop.mapreduce.lib.input.FileInputFormat
//org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,out);
//启动
job.waitForCompletion(true);
//最后提示一下
System.out.println("WordCount01已完成");
}
}
跑一下,记得打包上传到HDFS,和清理 output 文件
由于代码中,已经指定了输入路径和输出路径,所以调用时不用再声明
//进入jar包路径 cd /usr/local/test/wordcount hadoop jar wordcount01.jar day43.WordCount01
2.2 单词统计 ver 2.0 预聚合
package day43;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount02 {
public static class WordMapper
extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//老样子,先把一行数据取下来
String line = value.toString();
//拆分获得字符串数组,再挨个发送
String[] words = line.split(",");
for (String word : words) {
context.write(new Text(word),new LongWritable(1));
}
//map方法会总动对每一行都做出操作,所以遍历交给她就好了
}
}
//combine 预聚合 一个发生在reduce之前reduce端,用以提高效率
//实现上和reducer几乎完全相同,只需在main中额外开启
public static class Combine
extends Reducer{
@Override
protected void reduce(Text key, Iterable values
, Context context) throws IOException, InterruptedException {
//提前为当前map task里的预加和
long count=0;
for (LongWritable value : values) {
//转换一下数据类型为long
count += value.get();
}
//挨个抬走
context.write(key,new LongWritable(count));
//同样,reduce也会对每一个操作,遍历也不需要担心
}
}
public static class WordReducer
extends Reducer{
@Override
protected void reduce(Text key, Iterable values
, Context context) throws IOException, InterruptedException {
//
//通过迭代器聚合每一个1
long count=0;
for (LongWritable value : values) {
//转换一下数据类型为long
count += value.get();
}
//挨个抬走
context.write(key,new LongWritable(count));
//同样,reduce也会对每一个操作,遍历也不需要担心
}
}
public static void main(String[] args) throws Exception{
// 配置mapreduce
Job job = Job.getInstance();
//赋予job名字
job.setJobName("第二个MR,单词统计考虑一行多个,预聚合");
//反射一堆应用类
//jar包里的当前使用类
job.setJarByClass(WordCount02.class);
//Map端所在类的位置
job.setMapperClass(WordMapper.class);
//指定map端kv的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//combine端所在类的位置
job.setCombinerClass(Combine.class);
//reduce端所在类的位置
job.setReducerClass(WordReducer.class);
//指定reduce端kv的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定Hadoop路径
Path input = new Path("/words.txt");
//输出路径不能已存在
Path out = new Path("/output");
//连接HDFS,如果路径存在,就消灭
//已经连上Hadoop,所以不再需要端口号等,给一个空的
FileSystem fs = FileSystem.get(new Configuration());
if(fs.exists(out)){
//true迭代删除
fs.delete(out,true);
}
//提前加载路径,注意类的区分
//org.apache.hadoop.mapreduce.lib.input.FileInputFormat
//org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,out);
//启动
job.waitForCompletion(true);
//最后提示一下
System.out.println("WordCount02已完成");
}
}
预聚合可以极大程度减轻 reduce 环节的处理量,提高程序的效率
hadoop jar wordcount01.jar day43.WordCount02
2.3 按照班级统计年龄和
局部代码(都差不多,不全贴了)
public static class ClazzMapper
extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String student = value.toString();
//1500100001,施笑槐,22,女,文科六班
String[] split = student.split(",");
String ages = split[2];
//String转数字 Integer.parseInt
long age = Integer.parseInt(ages);
String clazz = split[4];
context.write(new Text(clazz),new LongWritable(age));
}
}
public static class Combiner{...}
public static class ClazzReducer
extends Reducer{
@Override
protected void reduce(Text key, Iterable values
, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable age : values) {
count += age.get();
}
context.write(key,new LongWritable(count));
}
}
2.4 筛选女性(只使用 map ,关闭 reduce)
默认情况下,框架一定会给程序生成一个 reduce task 即使没有在 main 中声明
想要完全关闭,需要手动设置参数为0
job.setNumReduceTasks(0);
局部代码
public static class SexMapper
extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] student = line.split(",");
//1500100001,施笑槐,22,女,文科六班
String sex = student[3];
if("女".equals(sex)){
context.write(new Text(line),NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception{
Job job = Job.getInstance();
//reduce会默认生成一个,想要关闭,需要手动设置参数为0
job.setNumReduceTasks(0);
...省略,一处要注意
//指定 map 的输出类时,要换成 null
job.setMapOutputValueClass(NullWritable.class);
}
2.5 关于开启两个 reduce
同上,手动控制 reduce task 的数量
public static class FileMapper
extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class FileReducer
extends Reducer{
@Override
protected void reduce(Text key, Iterable values
, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
public static void main(String[] args) throws Exception{
...
//设置两个reduce task
job.setNumReduceTasks(2);
...
//要点同上
job.setMapOutputValueClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
}
当使用两个 reduce task 后,map 后生成的
最终输出两组结果,得到两个文件
package day43;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
public class Join {
public static class JoinMapper
extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//context承上启下,连接着split切片
//通过切片,可以获得当前信息来自的路径
//students一定是以students.txt结尾
//score一定是以score.txt结尾
InputSplit inputSplit = context.getInputSplit();
//需要转化成FileSplit,注意类
//org.apache.hadoop.mapreduce.lib.input.FileSplit
FileSplit fileSplit = (FileSplit)inputSplit;
//获得路径的String
//这里有点怪,FileSplit是InputSplit的实现子类
//但是InputSplit并没有getPath()方法
//向下强制转型后却可以正常使用
String path = fileSplit.getPath().toString();
//判断结尾,打上标记,学生是#,分数是$,方便reduce阶段处理
if(path.endsWith("students.txt")){
String id = value.toString().split(",")[0];
String stu = "#" + value.toString();
context.write(new Text(id),new Text(stu));
} else{
String id = value.toString().split(",")[0];
String score = "$" + value.toString();
context.write(new Text(id),new Text(score));
}
}
}
public static class JoinReducer
extends Reducer{
//可能是
//也可能是
@Override
protected void reduce(Text key, Iterable values
, Context context) throws IOException, InterruptedException {
//存储当前学生信息,去掉#
String student = null;
//用来计算当前分数总和
ArrayList scores = new ArrayList();
//用来计算分数和
long sum = 0;
for (Text value : values) {
String s = value.toString();
if(s.startsWith("#")){
//是学生信息,去头部的#
student = s.substring(1);
} else {
//是分数,丢到list
long score = Integer.parseInt(s.split(",")[3]);
scores.add(score);
}
}
//求和,再拼接
for (Long score : scores) {
sum += score;
}
student = student + ","+sum;
context.write(new Text(student),NullWritable.get());
}
}
public static void main(String[] args) throws Exception{
Job job = Job.getInstance();
job.setJobName("join");
job.setJarByClass(Join.class);
//map
job.setMapperClass(JionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//reduce
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
Path input1 = new Path("/students.txt");
FileInputFormat.addInputPath(job,input1);
Path input2 = new Path("/score.txt");
FileInputFormat.addInputPath(job,input2);
//路径不能已存在
// 手动加上 如果存在就删除 FileSystem
Path output = new Path("/output");
FileSystem fs = FileSystem.get(new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动job
job.waitForCompletion(true);
}
}
2.7 两表联查(Map Join)
2.6 中,将大量数据塞到 reduce task 中处理(实际上是在硬盘中处理),非常影响效率
针对一种特殊情况:连表时,一张表比较小,可以放进内存中
我们会尝试将小表塞进内存(java 中构建 hash 表,可以以 key 值为索引,查得快)
每一个 map task 都安排一个,在 map 阶段就从大表中依次与小表匹配,能够让效率更高。
这种把小表提溜出来示众的方法也叫做小表广播
package day43;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
public class MapJoin {
public static class JoinMapper
extends Mapper{
HashMap hm = new HashMap();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//获取文件操作系统的操作对象
//已经登入,因此不需要ip和端口
FileSystem fs = FileSystem.get(context.getConfiguration());
Path path = new Path("/score.txt");
FSDataInputStream open = fs.open(path);
//给小表存进hashmap 即内存中
BufferedReader br
= new BufferedReader(new InputStreamReader(open));
String line;
while((line=br.readLine())!=null){
hm.put(line.split(",")[0],line);
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
String id = split[0];
String score = split[2];
//查找学生信息
String student = hm.get(id);
//开始获取,拼接
context.write(new Text(student),new Text(score));
}
}
public static class JoinReducer
extends Reducer{
@Override
protected void reduce(Text key, Iterable values
, Context context) throws IOException, InterruptedException {
long sum=0;
for (Text value : values) {
long score = Integer.parseInt(value.toString());
sum += score;
}
String student = key.toString();
String all = student + sum;
context.write(new Text(all), NullWritable.get());
}
}
public static void main(String[] args) throws Exception{
Job job = Job.getInstance();
job.setJobName("MapJoin");
job.setJarByClass(MapJoin.class);
//map
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//reduce
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
Path input1 = new Path("/students.txt");
FileInputFormat.addInputPath(job,input1);
Path input2 = new Path("/score.txt");
FileInputFormat.addInputPath(job,input2);
Path output = new Path("/output");
FileSystem fs = FileSystem.get(new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动job
job.waitForCompletion(true);
}
}
没跑,保不齐有点小错误
四、 yarn
资源的调度和管理平台
主节点,可以有2个
ResourceManager
负责集群资源的分配与调度
MapReduce、Storm、Spark 等应用,必须实现 ApplicationMaster 接口,才能被RM管理
从节点,有很多个
NodeManager
单节点资源的管理(CPU+内存)
1、提交一个 MR ,会先想 RM 做出资源申请
2、RM 在 NM 中申请一个容器(container),
3、在容器中启动 Application Master ,相应启动一个 JVM
4、AM 获得 task 信息后,向 RM 为每一个 task 各申请一个 container
5、在每个 container 中,执行对应的 task
6、执行完毕后, container 被 NM 回收,JVM 退出
要注意 main 方法中,对应输入的对象 class
NullWritable NullWritable.class Text Text.class LongWritable LongWritable.class



