栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

MapReduce的InputFormat组件与OutputFormat组件的实验

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

MapReduce的InputFormat组件与OutputFormat组件的实验

实验内容:
(1)使用MapReduce常用组件实现对常见的数据处理;
(2)给定若干个文档,使用Eclipse搭建maven工程,编写基于MapReduce的分布式程序,实现词频统计;
(3)给定两个文档A和B,现在需要将A和B的内容合并到C文档,并去除A和B中重复的内容,请编写MapReduce程序完成这一任务。
实验过程:
词频统计:
1.    创建maven工程导入所需的jar包
部分主要代码如下: 


org.apache.hadoop
    hadoop-client
    ${hadoop.version}


    org.apache.hadoop
    hadoop-common
    ${hadoop.version}


    org.apache.hadoop
    hadoop-hdfs
    ${hadoop.version}

2.    编写主程序代码

public class WordCountMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //定义MapReduce的8个步骤 然后通过Job组装
        //继承父类 Configured 获取conf
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, "WordCount");
        //如果需要集群运行,需要设置程序运行主类;若本地运行,则不需要该设置
        job.setJarByClass(WordCountMain.class);
        //通过Job组装MpaReduce的8个步骤
        //第一步:读取文件,解析成为key,value对(k1,v1)
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///D:\input\数据文件"));
        //第二步:自定义map逻辑,接受(k1,v1) 转换成为新的 (k2,v2)
        job.setMapperClass(MyMapper.class);
        //设置map的输出的key,value对象(k2,v2)的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        //第七步:自定义reduce逻辑
        job.setReducerClass(MyReduce.class);
        //设置reduce的输出key,value对象(k3,v3)的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //第八步:
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\output"));
        //提交整个job任务,等到整个任务结束之后,如果任务执行成功,返回true;如果执行失败,返回false
        boolean b = job.waitForCompletion(true);
        //使用三目运算符
        return b?0:1;
    }
//  MapReduce进行WordCount的入口类
    public static void main(String[] args) throws Exception {
        //获取配置文件
        Configuration configuration = new Configuration();
        //通过ToolRunner.run 执行程序的入口
        int run = ToolRunner.run(configuration, new WordCountMain(), args);
        //整个系统的退出
        System.exit(run);
    }
}

3.    编写mapper类
部分代码如下:

public class MyMapper extends Mapper {
    //准备好输出的: 
    Text text = new Text();
    IntWritable intWritable = new IntWritable(1);
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //Text value: 对文件一行记录进行处理
        String line = value.toString();
        String[] split = line.split(",");
        for (String word : split){
            //context 为 
            text.set(word);
            context.write(text,intWritable);
        }
    }
}

4.    编写reduce类
部分代码如下:

public class MyReduce extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int result = 0;
        for(IntWritable value:values){
            int i = value.get();
            result += i;
        }
        IntWritable intWritable = new IntWritable(result);
        context.write(key,intWritable);
    }
}

数据去重:
1.mapper实现(部分主要代码)

    public class DedupMapper extends Mapper{
private static Text field=new Text ();
//<0,2018-3-1 a><11,2018-3-2b>
@Override
    protected void map (Longwritable key, Text value,
    Context context) throws IOException, InterruptedException {
    field=value;
    //NullWritable.get()方法设置空值
    context.write(field, NullWritable.get());
}
}


2.reduce实现(部分主要代码)

public class DedupReducer extends Reducer{
private static Text field=new Text ();
//<2018-3-1 a,null><2018-3-2 b,null><2018-3-3 c,null>
@Override
    protected void reduce(Text key,Iterablevalues,Context context)throws IOException,InterruptedException{
    context.write(field, NullWritable.get());
}
}


3.driver主代码实现(部分主要代码)

3.driver主代码实现(部分主要代码)
public class DedupDriver {
public static void main (String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(DedupDriver.class);
job.setMapperClass(DedupMapper.class);
job.setReducerClass(DedupReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("D:\Dedup\input"));
//指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job,new Path("D:\Deduploutput"));
job.waitForCompletion(true);
}
}

实验结果:

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/340396.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号