实验内容:
(1)使用MapReduce常用组件实现对常见的数据处理;
(2)给定若干个文档,使用Eclipse搭建maven工程,编写基于MapReduce的分布式程序,实现词频统计;
(3)给定两个文档A和B,现在需要将A和B的内容合并到C文档,并去除A和B中重复的内容,请编写MapReduce程序完成这一任务。
实验过程:
词频统计:
1. 创建maven工程导入所需的jar包
部分主要代码如下:
org.apache.hadoop hadoop-client${hadoop.version} org.apache.hadoop hadoop-common${hadoop.version} org.apache.hadoop hadoop-hdfs${hadoop.version}
2. 编写主程序代码
public class WordCountMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//定义MapReduce的8个步骤 然后通过Job组装
//继承父类 Configured 获取conf
Configuration conf = super.getConf();
Job job = Job.getInstance(conf, "WordCount");
//如果需要集群运行,需要设置程序运行主类;若本地运行,则不需要该设置
job.setJarByClass(WordCountMain.class);
//通过Job组装MpaReduce的8个步骤
//第一步:读取文件,解析成为key,value对(k1,v1)
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\input\数据文件"));
//第二步:自定义map逻辑,接受(k1,v1) 转换成为新的 (k2,v2)
job.setMapperClass(MyMapper.class);
//设置map的输出的key,value对象(k2,v2)的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//第七步:自定义reduce逻辑
job.setReducerClass(MyReduce.class);
//设置reduce的输出key,value对象(k3,v3)的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//第八步:
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\output"));
//提交整个job任务,等到整个任务结束之后,如果任务执行成功,返回true;如果执行失败,返回false
boolean b = job.waitForCompletion(true);
//使用三目运算符
return b?0:1;
}
// MapReduce进行WordCount的入口类
public static void main(String[] args) throws Exception {
//获取配置文件
Configuration configuration = new Configuration();
//通过ToolRunner.run 执行程序的入口
int run = ToolRunner.run(configuration, new WordCountMain(), args);
//整个系统的退出
System.exit(run);
}
}
3. 编写mapper类
部分代码如下:
public class MyMapper extends Mapper{ //准备好输出的: Text text = new Text(); IntWritable intWritable = new IntWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //Text value: 对文件一行记录进行处理 String line = value.toString(); String[] split = line.split(","); for (String word : split){ //context 为 text.set(word); context.write(text,intWritable); } } }
4. 编写reduce类
部分代码如下:
public class MyReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int result = 0; for(IntWritable value:values){ int i = value.get(); result += i; } IntWritable intWritable = new IntWritable(result); context.write(key,intWritable); } }
数据去重:
1.mapper实现(部分主要代码)
public class DedupMapper extends Mapper{
private static Text field=new Text ();
//<0,2018-3-1 a><11,2018-3-2b>
@Override
protected void map (Longwritable key, Text value,
Context context) throws IOException, InterruptedException {
field=value;
//NullWritable.get()方法设置空值
context.write(field, NullWritable.get());
}
}
2.reduce实现(部分主要代码)
public class DedupReducer extends Reducer{ private static Text field=new Text (); //<2018-3-1 a,null><2018-3-2 b,null><2018-3-3 c,null> @Override protected void reduce(Text key,Iterable values,Context context)throws IOException,InterruptedException{ context.write(field, NullWritable.get()); } }
3.driver主代码实现(部分主要代码)
3.driver主代码实现(部分主要代码)
public class DedupDriver {
public static void main (String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(DedupDriver.class);
job.setMapperClass(DedupMapper.class);
job.setReducerClass(DedupReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("D:\Dedup\input"));
//指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job,new Path("D:\Deduploutput"));
job.waitForCompletion(true);
}
}
实验结果:



