a,b,c,d
a,s,d,f
d,f,g,c 就如此格式,
代码如下,比wordcount还要简单一点,代码差不多的
package make.hadoop.com.four_column;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class four_column extends Configured implements Tool {// 1、自己的map类// 2、继承mapper类,<LongWritable, Text, Text,// IntWritable>输入的key,输入的value,输出的key,输出的valuepublic static class MyMapper extendsMapper<LongWritable, Text, Text, IntWritable> {private IntWritable MapOutputkey = new IntWritable(1);private Text MapOutputValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { String strs = value.toString();// 分割数据String str_four = strs.split(",")[3]; MapOutputValue.set(str_four);System.out.println(str_four);context.write(MapOutputValue, MapOutputkey); }}// 2、自己的reduce类,这里的输入就是map方法的输出public static class MyReduce extendsReducer<Text, IntWritable, Text, IntWritable> { IntWritable countvalue = new IntWritable(1); @Override// map类的map方法的数据输入到reduce类的group方法中,得到<text,it(1,1)>,再将这个数据输入到reduce方法中protected void reduce(Text inputkey, Iterable<IntWritable> inputvalue,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable i : inputvalue) {System.out.println(i.get());sum = sum + i.get();}// System.out.println("key: "+inputkey + "...."+sum);countvalue.set(sum);context.write(inputkey, countvalue);}}// 3运行类,run方法,在测试的时候使用main函数,调用这个类的run方法来运行 public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(this.getConf(), "four_column"); // set mainclassjob.setJarByClass(four_column.class); // set mapperjob.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class); // set reducerjob.setReducerClass(MyReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); // set pathPath inpath = new Path(args[0]);FileInputFormat.setInputPaths(job, inpath);Path outpath = new Path(args[1]);FileOutputFormat.setOutputPath(job, outpath);FileSystem fs = FileSystem.get(conf);// 存在路径就删除if (fs.exists(outpath)) {fs.delete(outpath, true);}job.setNumReduceTasks(1); boolean status = job.waitForCompletion(true); if (!status) {System.err.println("the job is error!!");} return status ? 0 : 1; }public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); int atatus;try {atatus = ToolRunner.run(conf, new four_column(), args);System.exit(atatus);} catch (Exception e) {e.printStackTrace();} }}



