栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

word count

word count

word count

package cn.edu.swpu.scs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

public class WordCountWithCombine extends Configured implements Tool {

    public static void main( String[] args ) throws Exception {
        int res = ToolRunner.run(new WordCountWithCombine(), args);
        System.exit(res);
    }

    public static String tempValue;

    // 配置作业的主要参数和流程overwrite
    public int run(String[] args) throws Exception {
        
        // 创建作业,配置作业所需参数
        Configuration conf = new Configuration();
        // 创建作业
        Job job = Job.getInstance(conf, "WordCountWithCombine");
        String arg1 = args[0], arg2 = args[1];
        System.out.println("args1=====" + arg1);
        System.out.println("args2=====" + arg2);
        tempValue = "this value is from run";

        // 注入作业的主类
        job.setJarByClass(WordCountWithCombine.class);

        // 为作业注入Map和Reduce类
        job.setMapperClass(Map.class);
        //job.setCombinerClass(Combine.class);
        //job.setReducerClass(Reduce.class);
        //job.setNumReduceTasks(4);
        // 指定输入类型为:文本格式文件;注入文本输入格式类
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(arg1));
        //TextInputFormat.addInputPath(job, new Path("/mapred_input1"));

        // 指定输出格式为:文本格式文件;注入文本输入格式类
        job.setOutputFormatClass(TextOutputFormat.class);
        // 指定Key为文本格式;注入文本类
        job.setOutputKeyClass(Text.class);
        // 执行Value为整型格式;注入整型类
        job.setOutputValueClass(IntWritable.class);
        // 指定作业的输出目录
        TextOutputFormat.setOutputPath(job, new Path(arg2));
        //TextOutputFormat.setOutputPath(job, new Path("/mapred_output1"));

        
        // 作业的执行流程
        // 执行MapReduce
        boolean res = job.waitForCompletion(true);
        if(res)
            return 0;
        else
            return -1;
    }

    // Map过程
    public static class Map extends Mapper {
        private final static IntWritable one = new IntWritable(1);
        //private Text word = new Text();
        private int num = 0;
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
//            System.out.println("file: " + ((FileSplit)context.getInputSplit()).getPath().toString());
//            System.out.println("map " + String.valueOf(num) + ": " + key.toString() + "================" + line);
            num ++;
//            System.out.println(tempValue);


  			String[] words = line.split(" ");
  			for(String word : words){
  				context.write(new Text(word), one);
  			}

//            StringTokenizer tokenizer = new StringTokenizer(line);	//split line to words by space
//            while (tokenizer.hasMoreTokens()) {						//operate all word by loop
//                word.set(tokenizer.nextToken());
//                context.write(word, one);							//write KV to context, word is key, word number is value
//            }
        }
    }

    // Combine过程
    public static class Combine extends Reducer {
        private int num = 0;
        @Override
        public void reduce(Text key, Iterable val, Context context)	//mothod for each key,input format key(value1,value2,......)
                throws IOException, InterruptedException {
            int sum = 0;
            Iterator values = val.iterator();
            while (values.hasNext()) {
                sum += values.next().get();					//sum value(one word count)
            }
            //System.out.print("Combine " + String.valueOf(num) + ": " + key.toString() + "================" + Integer.toString(sum)+ "n");
            //num ++;
            context.write(key, new IntWritable(sum));		//write one key and its count

        }
    }

    // Reduce过程
    public static class Reduce extends Reducer {
        private int num = 0;
        @Override
        public void reduce(Text key, Iterable val, Context context)	//mothod for each key,input format key(value1,value2,......)
                throws IOException, InterruptedException {
            int sum = 0;
            Iterator values = val.iterator();
            while (values.hasNext()) {
                sum += values.next().get();					//sum value(one word count)
            }
           // System.out.print("Reduce " + String.valueOf(num) + ": " + key.toString() + "================" + Integer.toString(sum)+ "n");
            //num++;
            context.write(key, new IntWritable(sum));		//write one key and its count
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604295.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号