MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
在给定的文本文件中统计输出每一个单词出现的总次数
环境准备
(1)创建maven工程,MapReduceDemo
(2)在pom.xml文件中添加如下依赖
org.apache.hadoop
hadoop-client
3.1.3
junit
junit
4.12
org.slf4j
slf4j-log4j12
1.7.30
maven-compiler-plugin
3.6.1
1.8
1.8
maven-assembly-plugin
jar-with-dependencies
make-assembly
package
single
在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
创建包名:com.atguigu.mapreduce.wordcount
编写Mapper类
package com.atguigu.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ private Text outK = new Text(); private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //读取一行 String line = value.toString(); //切割 String[] split = line.split(" "); for (String word : split) { outK.set(word); context.write(outK,outV); } } }
编写Reducer类
package com.atguigu.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ //放外面,提高效率,每一次reduce都要创建一个IntWritable对象,耗费内存。 private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { //累加值 int sum = 0; for (IntWritable value : values) { sum += value.get(); } //写出 outV.set(sum); context.write(key,outV); } }
编写driver驱动类
package com.atguigu.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//设置jar包路径
job.setJarByClass(WordCountDriver.class);
//关联mapper,reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置最终输出的map的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入路径输出路径
FileInputFormat.setInputPaths(job, new Path("F:\11_input\inputword"));
FileOutputFormat.setOutputPath(job, new Path("F:\hadoopDemo\output1"));
//提交job
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}



