栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【四】MapReduce入门

【四】MapReduce入门

一、概述

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带的默认组件,整合成一个完整的分布式运算程序,并行的运行在一个Hadoop集群上。

1. 优缺点
  1. 易于编程:简单的实现了MR接口,就可以完成一个分布式程序。
  2. 良好的扩展性:当计算资源不能满足时,可以通过简单的增加机器来扩展计算能力
  3. 高容错性:若一台机器挂了,会自动将正在运行的计算任务转移到另外一个节点上运行,不至于这个任务运行失败
  4. 适合处理PB级别以上的离线数据,但不擅长ms级别的实时计算和流式计算
  5. 不擅长DAG(有向无环图)计算,因为每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
2. 核心思想

MR工作流程

  1. MR运算程序一般分成Map阶段和Reduce阶段
  2. Map阶段会以逻辑分片的理念对要计算的文件进行读取(128M)
  3. 第一个阶段的MapTask是对文件进行逻辑划分后进行分割处理,每个MT之间完全并行运行,每个MT之间不相关
  4. 第二阶段的ReduceTask是将Map阶段处理好的数据进行汇总,之间也是并行运行,每个RT之间不想关
  5. 一个MR程序只能包含一个Map阶段和Reduce阶段,若业务逻辑非常复杂,就只能多个MR程序穿行运行,但会产生大量的IO效率低下

MR进程

一个完整的MR程序在分布式运行时有三类实例继承:

  1. MrAppMaster:负责整个程序的过程调度及状态协调
  2. MapTask:负责Map阶段的真个数据处理流程
  3. ReduceTask:负责Reduce阶段的整个数据处理流程
二、实例 1. 编程规范

Mapper阶段

  1. 用户自定义的Mapper要继承自己的父类
  2. Mapper的输入数据是KV对的形式
  3. Mapper中的业务逻辑在Map()方法中实现
  4. Mapper的输出数据是KV对的形式
  5. 每一行数据对Map()方法调用一次

Reducer阶段

  1. 用户自定义的Reducer要继承自己的父类
  2. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  3. Reducer的业务逻辑在reduce()方法中实现
  4. ReduceTask进程对每一组相同的K的KV组调用一次reduce()方法

Driver阶段

相当于Yarn集群的客户端,用于提交整个程序到Yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象

2. WordCount实现

实现Mapper类

public class WordCountMapper extends Mapper {    
	Text k = new Text();
	IntWritable v = new IntWritable(1);	
    
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {		
		// 1 获取读取的一行数据
		String line = value.toString();	
		// 2 切割
		String[] words = line.split(" ");		
		// 3 输出
		for (String word : words) {			
			k.set(word);
			context.write(k, v);
		}
	}
}

实现Reducer类

public class WordcountReducer extends Reducer{
int sum;
IntWritable v = new IntWritable();
	@Override
	protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {	
		// 1 累加求和
		sum = 0;
		for (IntWritable count : values) {
			sum += count.get();
		}		
		// 2 输出
         v.set(sum);
		context.write(key,v);
	}
}

实现Driver类

public class WordcountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		// 集群运行需要配置
		Configuration configuration = new Configuration();
		//设置HDFS NameNode的地址
        configuration.set("fs.defaultFS", "hdfs://hadoop102:9820");
        // 指定MapReduce运行在Yarn上
        configuration.set("mapreduce.framework.name","yarn");
        // 指定mapreduce可以在远程集群运行
        configuration.set("mapreduce.app-submission.cross-platform","true");
        //指定Yarn resourcemanager的位置
        configuration.set("yarn.resourcemanager.hostname","hadoop103");

        // 获取配置信息以及获取job对象
        Job job = Job.getInstance(configuration);

		// 2 关联本Driver程序的jar
		job.setJarByClass(WordcountDriver.class);

		// 3 关联Mapper和Reducer的jar
		job.setMapperClass(WordcountMapper.class);
		job.setReducerClass(WordcountReducer.class);

		// 4 设置Mapper输出的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 5 设置最终输出kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 6 设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 提交job
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}
三、Hadoop序列化

Hadoop并没有采用Java的序列化机制,Java的序列化是一个重量级序列化框架,一个对象被序列化后会附带很多额外的信息,不便于在网络中传输,所以自己开发了一套序列化机制。

1. 自定义实现序列化接口
  1. 必须实现Writable接口

  2. 反序列化时,需要反射调用空参构造函数,所以必须空参构造

    public FlowBean() {
    	super();
    }
    
  3. 重写序列化方法

    @Override
    public void write(DataOutput out) throws IOException {
    	out.writeLong(upFlow);
    	out.writeLong(downFlow);
    	out.writeLong(sumFlow);
    }
    
  4. 重写反序列化方法

    @Override
    public void readFields(DataInput in) throws IOException {
    	upFlow = in.readLong();
    	downFlow = in.readLong();
    	sumFlow = in.readLong();
    }
    
  5. 注意反序列化的顺序和序列化的顺序完全一致

2. 实操

实现writable接口

// 1 实现writable接口
public class FlowBean implements Writable{

	private long upFlow;
	private long downFlow;
	private long sumFlow;
	
	//2  反序列化时,需要反射调用空参构造函数,所以必须有
	public FlowBean() {
		super();
	}

	public FlowBean(long upFlow, long downFlow) {
		super();
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}
	
	//3  写序列化方法
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}
	
	//4 反序列化方法
	//5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
	@Override
	public void readFields(DataInput in) throws IOException {
		this.upFlow  = in.readLong();
		this.downFlow = in.readLong();
		this.sumFlow = in.readLong();
	}

	// 6 编写toString方法,方便后续打印到文本
	@Override
	public String toString() {
		return upFlow + "t" + downFlow + "t" + sumFlow;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}
}

编写Mapper类

public class FlowCountMapper extends Mapper{
	
	FlowBean v = new FlowBean();
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		
		// 1 获取一行
		String line = value.toString();
		
		// 2 切割字段
		String[] fields = line.split("t");
		
		// 3 封装对象
		// 取出手机号码
		String phoneNum = fields[1];

		// 取出上行流量和下行流量
		long upFlow = Long.parseLong(fields[fields.length - 3]);
		long downFlow = Long.parseLong(fields[fields.length - 2]);

		k.set(phoneNum);
		v.set(downFlow, upFlow);
		
		// 4 写出
		context.write(k, v);
	}
}

编写Reduce类

public class FlowCountReducer extends Reducer {

	@Override
	protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {

		long sum_upFlow = 0;
		long sum_downFlow = 0;

		// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
		for (FlowBean flowBean : values) {
			sum_upFlow += flowBean.getUpFlow();
			sum_downFlow += flowBean.getDownFlow();
		}

		// 2 封装对象
		FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
		
		// 3 写出
		context.write(key, resultBean);
	}
}

编写Driver驱动类

public class FlowsumDriver {

	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
		
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "e:/input/inputflow", "e:/output1" };

		// 1 获取配置信息,或者job对象实例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 指定本程序的jar包所在的本地路径
		job.setJarByClass(FlowsumDriver.class);

		// 3 指定本业务job要使用的mapper/Reducer业务类
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		// 4 指定mapper输出数据的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 5 指定最终输出的数据的kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		// 6 指定job的输入原始文件所在目录
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354938.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号