MapReduce是一个分布式运算程序的编程框架,使用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
用户只关心业务逻辑。实现框架的接口。
<2>良好的扩展性可以动态增加服务器,解决计算资源不够的问题。
<3>高容错性任何一个节点宕机,可以将任务转移到其他节点
<4>适合海量数据计算TB/PB
几千台服务器共同计算
sparkstreaming ,flink擅长
<3>不适合DAG有向无环图计算spark适合
4. MapReduce核心思想<1>分为Map阶段和Reduce阶段
<2>Map阶段并发MapTask,完全并行运行,互不干扰
<3>Reduce阶段的并发ReduceTask,完全并行运行,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
<4>MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那么只能多个MapReduce程序,串行运行
负责整个程序的过程调度以及状态协调
<2>MapTask负责Map阶段的整个数据处理流程
<3>ReduceTask负责Reduce阶段的整个数据处理流程
6. WordCount案例 <1>Map类 <2>Reduce类 <3>Main方法 7. 常用数据序列化类型 8. MapReduce编程规范用户编写的程序分为三部分:Mapper、Reducer 和Driver
<1> Mapper- 用户自定义的Mapper要继承自己的父类Mapper的输入数据是kv对的形式Mapper中的业务逻辑写在map()方法中Mapper的输出数据是kv对的形式map()方法(MapTask进程)对每一个
- 用户自定义的Reducer要继承自己的父类Reducer的输入数据类型对应Mapper的输出数据类型,也是KV对Reducer的业务逻辑写在reduce()方法中ReduceTask进程对每一组相同k的
相当于YARN集群的客户端,用于提交我们整个程序到YRAN集群,提交的是封装了MapReduce程序相关运行参数的job对象。
- 获取配置信息,获取job对象实例指定本程序jar包所在的本地路径关联Mapper/Reducer业务类指定Mapper输出数据的kv类型指定最终输出的数据的kv类型指定job的输入原始文件所在目录指定job的输出结果所在目录(不能提前存在)提交作业
<1>在给定的文本文件中统计输出每个单词出现的总次数
<2>输入数据: hello.txt
<3>期望输出数据: 统计结果
2. 过程分析 <1>Mapper- 将MapTask传给我们的文本先转换为String根据空格将这一行切分成单词将单词输出为<单词,1>的形式
- 汇总各个key的个数输出该key的总次数
- 获取配置信息,获取job对象实例指定本程序jar包所在的本地路径关联Mapper/Reducer业务类指定Mapper输出数据的kv类型指定最终输出的数据的kv类型指定job的输入原始文件所在目录指定job的输出结果所在目录(不能提前存在)提交作业
<3>Log4J配置文件org.apache.hadoop hadoop-client 3.1.3 junit junit 4.12 org.slf4j slf4j-log4j12 1.7.30
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n<4>创建包路径
com.demo.mapreduce.wordcount4. 编写程序 <1>WordCountMapper
package com.demo.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<2>WordCountReducer{ private Text outK = new Text(); private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //1. 获取一行数据 String line = value.toString(); //2. 对数据进行切割 String[] words = line.split(" "); //3. 循环写出 for (String word : words) { outK.set(word); context.write(outK, outV); } } }
package com.demo.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<3>WordCountDriver{ private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { //1. 累加 int sum = 0; for (IntWritable value : values) { sum += value.get(); } //2. 写出 outV.set(sum); context.write(key, outV); } }
package com.demo.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1. 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2. 设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3. 关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4. 设置map的输出的key 和 value 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5. 设置最终输出的key 和 value 的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 6. 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path("E:\javaWordspaces\MapReduceDemohello.txt"));
//7.指定job的输出结果所在目录(不能提前存在)
FileOutputFormat.setOutputPath(job, new Path("E:\javaWordspaces\MapReduceDemo\result"));
// 8.提交作业
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
5. 集群运行
<1>maven打包插件
<2>修改WordCountDriver <3>mvn clean install <4>将jar包上传到linux服务器 <5>准备数据maven-compiler-plugin 3.6.1 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single
HDFS 下创建 /wcinput/word.txt
<6>执行命令hadoop jar MapReduceDemo-1.0-SNAPSHOT.jar com.demo.mapreduce.wordcount.WordCountDriver /wcinput/word.txt /wcoutput<7>结果



