引入依赖:
- idea新建maven项目
- 引入依赖
- 创建Mapper
- 创建Reducer
- 创建Driver
- Ubuntu内部运行(需要实现搭建好maven环境)
//在pom.xml文件中创建Mapper类4.0.0 com.edu.nefu mr 1.0-SNAPSHOT mr http://www.example.com UTF-8 1.7 1.7 junit junit 4.11 test org.apache.logging.log4j log4j-core 2.8.2 org.apache.hadoop hadoop-common 2.7.2 org.apache.hadoop hadoop-client 2.7.2 org.apache.hadoop hadoop-hdfs 2.7.2 maven-clean-plugin 3.1.0 maven-resources-plugin 3.0.2 maven-compiler-plugin 3.8.0 maven-surefire-plugin 2.22.1 maven-jar-plugin 3.0.2 maven-install-plugin 2.5.2 maven-deploy-plugin 2.8.2 maven-site-plugin 3.7.1 maven-project-info-reports-plugin 3.0.0 maven-compiler-plugin 3.1 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single
//包名自定义 package com.edu.nefu; //注意引入的包必须是apache的新包 import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; // map阶段 // key in --索引、偏移量 // value in --String 类型 // key out --String 类型 // value out public class WordcountMapper extends Mapper创建Reducer类{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.获取一行 String line = value.toString(); // 2.切割 String[] words = line.split(" "); // 3.输出 for (String word : words) { k.set(word); context.write(k,v); } } }
//包名自定义 package com.edu.nefu; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordcountReducer extends Reducer创建Driver类{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1.累加求和 sum = 0; for (IntWritable value : values) { sum += value.get(); } // 2.输出 v.set(sum); context.write(key,v); } }
package com.edu.nefu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1,获取配置信息以及封装任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2,设置jar加载路径
job.setJarByClass(WordcountDriver.class);
// 3,设置map和reduce类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 4,设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5,设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6,设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 7,提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}
- 随后clean package
- 将胖包导入Ubuntu
- 注意一点:必须先启动
· namenode
· datanode
· resourcemanager
· nodemanager
#简单写一个wordcount文本 vim word count.txt hdfs dis -midair -p /user/root/input #发送到虚拟机,运行 hadoop jar ……Hadoop的序列化案例
- 需求
· 统计手机号码的耗费的总上行流量/下行流量/状态妈/ 返回总流量
· 输入数据:phone_data.txt- 需求分析
· 一个典型的wordcount案例扩展
· 但是使用Hadoop原生数据类型很难实现
· 使用自定义- 满足条件
· 必须实现writeable接口
· 反序列化时,需要反射调用空参构造函数,所以必须有空参构造:
public FlowBean(){ super();}- · 重写序列化方法;
· 重写反序列化方法- 序列化顺序和反序列化顺序必须完全一致
· 先进先出的队列;- 想要把结果显示在文件中,需要重写toString(),可用”t”分开,方便后续调用;
- 需要将自定义的bean放在key中传输,则还需要实现Comparable借口,MapReduce框架中的shuffle过程要求对key必须能排序。
//自定义WriteBean
package com.edu.nefu;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable{
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean(){
super();
}
public FlowBean(long upFlow,long downFlow) {
this.upFlow=upFlow;
this.downFlow=downFlow;
this.sumFlow = upFlow + downFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(this.upFlow);
dataOutput.writeLong(this.downFlow);
dataOutput.writeLong(this.sumFlow);
}
@Override
public String toString(){
return this.upFlow+":"+this.downFlow+":"+this.sumFlow;
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow=dataInput.readLong();
this.downFlow=dataInput.readLong();
this.sumFlow=dataInput.readLong();
}
public long getUpFlow() {
return upFlow;
}
public long getDownFlow() {
return downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void set(long u,long d){
this.upFlow=u;
this.downFlow=d;
this.sumFlow=u+d;
}
}
//重写wordcount Mapper package com.edu.nefu; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper{ public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String Line = value.toString(); String[] fields = Line.split(" "); Text k = new Text(); FlowBean v = new FlowBean(); k.set(fields[1]); v.setUpFlow(Long.parseLong(fields[fields.length-3])); v.setDownFlow(Long.parseLong(fields[fields.length-2])); v.setSumFlow(Long.parseLong(fields[fields.length-3])+Long.parseLong(fields[fields.length-2])); context.write(k, v); } }
//重写Reducer package com.edu.nefu; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer{ public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { long total_upFlow = 0; long total_downFlow = 0; for(FlowBean bean:values){ total_upFlow += bean.getUpFlow(); total_downFlow += bean.getDownFlow(); } FlowBean v = new FlowBean(); v.set(total_upFlow,total_downFlow); context.write(key,v); } }
//重写Driver
package com.edu.nefu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowBeanDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1,获取配置信息以及封装任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2,设置jar加载路径
job.setJarByClass(FlowBeanDriver.class);
// 3,设置map和reduce类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4,设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5,设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6,设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 7,提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}
理论 MapReduce的切片 1. 切片与MapTask的并行度决定机制总结
· 这只是在wordcount的基础上重写了FlowBean的类
· 整体的Driver并没有多么大的改动
· 这是wordcount案例的拓展延伸
2.FileinputFormat切片过程
- 问题引出
· MapTask的并行度决定Map阶段的任务处理并发度,进而影响整个Jod的处理速度
· 用多少个任务并行处理Map数据。直接影响集群的并发处理能力- MapTask并行度决定机制
· 数据块:Block是HDFS在物理上把数据分成一块一块。
· 数据切片:数据切片只是在逻辑上对输入进行切分,并不会在磁盘上将其切分成片进行存储。- 切块需要按照BlockSize来切分
- 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
- 每个spilt切片分配一个MapTask并行实例处理
- 默认情况下切片大小是一个BlockSize
- 切片时不考虑数据集整体,而是针对每个文件进行切片
- 先找到数据存储的路径
- 开始遍历处理(规划切片)目录下的每一个文件
- 开始便利第一个文件:
· 获取文件的大小
· 计算切片大小
· 默认切片大小=BlockSize
· 开始切分,每次切片时,都要判断切完剩下的部分是否大于切片大小的1.1倍,如果不大于1.1倍就不进行切分(考点、大题)



