MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。 MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发运行在一个 Hadoop 集群上。
优点:易于编程(用户只关心业务逻辑即可)、良好的扩展性(可以动态增加服务器)、高容错性(任意一台机器挂掉,可以将任务转移到另一节点)、适合海量数据计算(PB/BP)
缺点:不擅长实时计算、不擅长流式计算、不擅长 DAG(有向无环图)计算(相当于一个迭代式的计算,一台服务器计算后结果一次传递下去)
MapReduce的核心思想
注:Map阶段由MapTask并发实例,完全并行运行,互不相干,reduce阶段由reduceTask并发实例互不相干,但是依赖于上一阶段Map的输出。
MapReduce编程规范
1、Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(MapTask进程)对每一个调用一 次
2、reduce阶段
(1)用户自定义的Reducer要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同k的组调用一次reduce()方法
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是 封装了MapReduce程序相关运行参数的job对象
WordCount案例(先是本地模式,然后将其打包上传到集群进行运行)首先准备一个数据文本文件数据如下:
接着开始对idea进行项目环境搭建,创建一个Maven工程,在pom.xml文件中添加依赖
maven-compiler-plugin
3.6.1
1.8
1.8
maven-assembly-plugin
jar-with-dependencies
make-assembly
package
single
8
8
org.apache.hadoop
hadoop-client
3.1.3
junit
junit
4.12
org.slf4j
slf4j-log4j12
1.7.30
并在项目的src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在 文件中填入。
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
开始编写Mapper类
package cn.itjdb.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ //将String类型转换为Text类型,优化代码将其放置在方法外,减少内存使用 private Text outK = new Text(); //而第二个参数是单独的Int类型,也要将其转换为IntWritable类型 private IntWritable outV=new IntWritable(1); //重写map方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取一行 String line = value.toString(); //划分数据 String[] words = line.split(" "); //遍历数据并将其写入 for (String word:words){ //封装outK outK.set(word); //而第二个参数是单独的Int类型,也要将其转换为IntWritable类型 //写出 context.write(outK,outV); } } }
编写reduce类
package cn.itjdb.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer{ int sum; IntWritable outV=new IntWritable(); //重写reduce方法 @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //Text key:对应于reduce输入的key;Iterable values:对应于reduce输入的values,他是一个集合 System.out.println(values.iterator().hasNext()); System.out.println(key); sum=0; //累加 //hello,(1,1,...) for (IntWritable value : values) { sum+=value.get(); } //封装 outV.set(sum); //写出 context.write(key,outV); } }
编写Driver类
package cn.itjdb.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2、获取jar包路径
job.setJarByClass(WordCountDriver.class);
//3、关联mapper和reduce
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
//4、设置map输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5、设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、设置输入路径和输出路径
FileInputFormat.setInputPaths(job,new Path("D:\input\inputword"));
FileOutputFormat.setOutputPath(job,new Path("D:\input\outputword"));
//7、提交job
boolean result=job.waitForCompletion(true);
System.exit(result? 0:1);
}
}
本地模式:就可以在D:\input\outputword路径下查看结果。
而对于集群模式,先将项目进行打包,然后将打包好得jar包上传到hadoop102的hadoop-3.1.3中,然后执行。结果可以http://hadoop102:9870/地址中查看结果
后续在进一步学习中。。。。。
2、hadoop序列化 3、MapReduce框架原理 4、Hadoop数据压缩


