初试Hadoop之MapReduce
一、MapReduce的定义二、MR的优缺点三、认识与识别MR
3.1常用数据序列化类型3.2官方WordCount源码解析3.3手写MR的WordCount案例
一、MapReduce的定义MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个Hadoop集群上。
二、MR的优缺点优点:
它是框架,易于编写
良好的扩展性(通过对集群的机器的扩展,从而增加计算能力)
高容错性(MR如果执行失败,它会重新分配任务)
适合做海量数据的离线处理
缺点:
不擅长实时计算,无法像MySql一样,在毫秒或者秒级内返回结果不擅长流式计算,因为输入的数据集时静态的不擅长DAG(有向无环图)计算,使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下 三、认识与识别MR 3.1常用数据序列化类型
老规矩,先和我一起认识认识MR里面的数据类型吧,与Java做对比吧!
| Java类型 | Hadoop Writable类型 |
|---|---|
| Boolean | BooleanWritable |
| Byte | ByteWritable |
| Int | IntWritable |
| Float | FloatWritable |
| Long | LongWritable |
| Double | DoubleWritable |
| String | Text |
| Map | MapWritable |
| Array | ArrayWritable |
| Null | NullWritable |
有细心的小伙伴可能发现了,这个类型,其实没那么难记,除了String类型外,就只剩下在后面加上一个Writable就可了。
3.2官方WordCount源码解析看源码之前可以尝试着带着几个问题去看:
一个最基本的MR的程序是如何构成的?程序的运行流程是怎样的?
官方源码解析如下
package org.apache.hadoop.examples;
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount
{
public static class TokenizerMapper extends Mapper
3.3手写MR的WordCount案例
3.3.1需求说明
在给定的文本文件中统计输出每一个单词出现的总次数
(1)输入数据hello.txt
atguigu atguigu ss ss cls cls jiao banzhang xue hadoop sgg sgg sgg nihao nihao bigdata0111 laiba
(2)期望输出数据
atguigu 2 banzhang 1 cls 2 hadoop 1 jiao 1 ss 2 xue 1
3.3.2 在IDEA创建项目并完成一些配置信息
创建Maven工程不会IDEA中创建Maven项目的可以参考我之前的博客Maven的安装以及在IDEA中的使用 ,
(1) 在pom.xml中添加一下配置信息
junit junit 4.12 org.apache.logging.log4j log4j-slf4j-impl 2.12.0 org.apache.hadoop hadoop-client 3.1.3
(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入。
这个文件可以简单的理解成打印错误日志的文件。不配置不影响你的接下来操作。
3.3.3模拟实操MR
(1)编写mapper类
package com.mr_test.wordcount_hello; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class helloMapper extends Mapper{ Text outk = new Text(); IntWritable outv = new IntWritable(1); @Override protected void map(LongWritable key ,Text value ,Mapper .Context context) throws IOException, InterruptedException { //获取当前输入的数据 String line = value.toString(); //切割数据 String[] splits = line.split(" "); for (String split: splits) { outk.set(split); //遍历集合 封装 输出数据的key 和 value context.write(outk,outv); } } }
(2)编写Reducer类
package com.mr_test.wordcount_hello; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class helloReducer extends Reducer{ //为啥不在这里定义sum,因为这里算全局变量 // private int sum = 0; IntWritable outv = new IntWritable(); @Override protected void reduce(Text key ,Iterable values ,Reducer .Context context) throws IOException, InterruptedException { //每次一个key值就会重置 int sum=0; //获取每一个key出现的次数 for (IntWritable value: values) { sum += value.get(); } //输出 outv.set(sum); context.write(key,outv); } }
(3)编写Driver驱动类
package com.mr_test.wordcount_hello;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class helloDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//获取配置信息
Configuration conf = new Configuration();
//获取job对象
Job job = Job.getInstance(conf);
//关联Driver程序相关的jar
job.setJarByClass(helloDriver.class);
//关联Mapper相关的jar
job.setMapperClass(helloMapper.class);
//关联Reducer相关的jar
job.setReducerClass(helloReducer.class);
//设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path("D:\StudyFile\BigDate\02.大数据技术之Hadoop\03.代码\day04\MapReduce\src\main\java\com\atguigu\mr\wordcount\com.bigData.mapreduce\src\main\resources\wcinput\hello.txt"));
FileOutputFormat.setOutputPath(job,new Path("D:\StudyFile\BigDate\02.大数据技术之Hadoop\03.代码\day04\MapReduce\src\main\java\com\atguigu\mr\wordcount\com.bigData.mapreduce\src\main\java\testData\wcinput2"));
//提交job
job.waitForCompletion(true);
}
}
(4) 运行结果截图
最近忙着摸鱼哈哈,没咋写博客了,但是这两天应该会再次更新MapReduce的后续内容和Yarn的。一起加油呀!!!



