1、新建maven项目,导入项目所需要的依赖
①通过File——New选择Project…
②选择Maven,Project SDK选择1.8。点击Next按钮
③GroupId写上公司名字的倒写。ArtifactId写上项目名(比如MapReduce)
④放到对应的文件夹下。并点击Finish。
⑤不要忘记点击右下角的Enable Auto-import。
⑥在pom.xml中导入相关的依赖。如下基本上直接拷贝即可。
pom.xml
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> mapreduce_wordcount junit log4j-core hadoop-common hadoop-hdfs hadoop-client |
2、Mapper阶段代码编写
在源码目录(src/main/java)下创建package,名为com.zhang.mapreduce。创建WcMapTask类,继承Mapper类,注意Mapper类有两个,这里选择第一个org.apache.hadoop.mapreduce包的。
并设置4个泛型参数。Mapper
注意Java的数据类型与Hadoop的数据类型的对应:
| Java数据类型 | Hadoop数据类型 |
| boolean | BooleanWritable |
| byte | ByteWritable |
| int | IntWritable |
| float | FloatWritable |
| long | LongWritable |
| double | DoubleWritable |
| String | Text |
| map | MapWritable |
| array | ArrayWritable |
Hadoop的这些类型都实现了序列化的处理。虽然Java中本身也做了序列号的处理,但是Hadoop的序列化比Java的序列化更加有效。因为Hadoop就是做磁盘数据的保存和读取,为了使保存和读取的效率更快,Hadoop就做了这个优化。
继承Map类的子类中,需要重写map方法。这个就是map阶段对数据进行处理的逻辑。该方法的参数:
- 参数1:key。输入数据的key值。行偏移量。比如342
- 参数2: value。输入数据的key对应的value值。行偏移量对应的行内容。比如“hadoop spark”
- 参数3:context。经过map方法处理后,使用context对象往shuffing阶段推送数据。
WcMapTask.java
| package com.zhang.mapreduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WcMapTask extends Mapper
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words){ context.write(new Text(word),new IntWritable(1)); } } } |
3、Reduce阶段代码编写
经过shuffling洗牌阶段后,reduce会接收 List(V2) 类型的数据,并输出 List(K3, V3) 类型的数据。接下来实现词频统计reduce阶段的代码。
在com.zhang.mapreduce包下创建WcReduceTask类,继承Reducer类,注意Reducer类有两个,这里选择org.apache.hadoop.mapreduce包的那个类。
并设置4个泛型参数。Reducer
继承Reduce类的子类中,需要重写reduce方法。这个就是reduce阶段对数据进行处理的逻辑。该方法的参数:
- 参数1:key。输入数据的key值。
- 参数2:value。输入数据的key单词次数的集合汇总。
- 参数3:context。经过reduce方法处理后,使用context对象往output阶段推送数据。
WcReduceTask.java
| package com.zhang.mapreduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WcReduceTask extends Reducer
@Override protected void reduce(Text key, Iterable int count = 0; for (IntWritable value : values){ count += value.get(); } context.write(key,new IntWritable(count)); } } |
注意:reduce()第一个参数Text类型是org.apache.hadoop.io包下的类。
4、定义Driver类, 描述 Job
在com.zhang.mapreduce包下创建WcMrJob类。
- 指明在词频统计的数据处理的业务中,有哪一个类执行了map的任务。
- 指明在词频统计的数据处理的业务中,有哪一个类执行了reduce的任务。
- 指明输入文件IO流的类型。
- 指明输出文件路径。
| package com.zhang.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WcMrJob { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //设置Driver类 job.setJarByClass(WcMrJob.class); //设置运行哪个map Task job.setMapperClass(WcMapTask.class); //设置运行哪个reduce Task job.setReducerClass(WcReduceTask.class); //设置map Task的输出的(key,value)的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce Task的输出的(key,value)的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定要处理的数据所在的位置 FileInputFormat.setInputPaths(job,"hdfs://192.168.91.101:8020/wordcount/input/big.txt"); //指定处理之后的结果数据保存位置 FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.91.101:8020/wordcount/output")); //向yarn集群提交job,并且设置一直到执行结束 boolean res = job.waitForCompletion(true); //运行成功,通过System.exit(0),退出虚拟机,结束任务 System.exit(res?0:1); } } |
注意几个问题:
Text.class不要写成Test.class。它是org.apache.hadoop.io包下的类。
5、使用maven将项目打包成jar包
点击maven,在Lifecycle中双击选择package
此时,留意地下的Run窗口在进行打包。
打包后,在工程中生成target文件夹
其中mapreduce_wordcount-1.0-SNAPSHOT.jar就是打包好的jar包。
把 mapreduce_wordcount-1.0-SNAPSHOT.jar 拷贝出来。
6、MapReduce作业运行
采用集群运行模式 :
1. 将 MapReduce 程序提交给 Yarn 集群, 分发到很多的节点上并发执行
2. 处理的数据和输出结果应该位于 HDFS 文件系统
3. 提交集群的实现步骤: 将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
运行hadoop作业:
1.将项目打包成mapreduce_wordcount-1.0-SNAPSHOT.jar,并且上传到hadoop101机器的/export/software 目录
| cd /export/softwares/ |
| rz -E |
2.运行写好的wordcount代码:
| hadoop jar mapreduce_wordcount-1.0-SNAPSHOT.jar com.zhang.mapreduce.WcMrJob |
提交作业,运行作业
执行后打开yarn的资源调度平台 http://192.168.91.101:8088/cluster
如下,比如现在的进度只有66.7%,要等全部运行完成。
等待一段时间。
到了这时候,就表示执行完成了。此时再访问yarn的资源调度平台 http://192.168.91.101:8088/cluster
3.查看生成的文件:
| hadoop fs -ls /wordcount/output |
或者查看 http://192.168.91.101:50070/explorer.html#/wordcount
留意wordcount目录下已经生成output结果了。里面有两个文件
- SUCCESS:这个是mapreduce作业执行成功的标志。
- part-r-00000:这个是执行成功后,结果数据保存的文件。
4.查看统计结果:
| hadoop fs -cat /wordcount/output/part-r-00000 |
可以看到词频统计的结果。
END



