栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce程序样例

MapReduce程序样例

public class MovieClass {
	public class MovieMapper extends Mapper {

		@Override
		protected void map(LongWritable key, Text value, Mapper.Context context)
				throws IOException, InterruptedException {
				String line = value.toString();
				String[] split = line.split("::");
				if (split.length >= 4) {
					context.write(new LongWritable(Long.valueOf(split[1])), new DoubleWritable(Double.valueOf(split[2])));
				}
		}

	}
	
	public class MovieReducer extends Reducer {

		@Override
		protected void reduce(LongWritable key, Iterable value,
				Reducer.Context context)
				throws IOException, InterruptedException {
			double sum = 0.0;
			int num = 0;
			for (DoubleWritable doubleWritable : value) {
				sum += doubleWritable.get();
				num++;
			}
			double avg = sum/num;
			context.write(key, new DoubleWritable(avg));
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "http://192.168.197.23:9000");
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(MovieClass.class);
		job.setMapperClass(MovieMapper.class);
		job.setReducerClass(MovieReducer.class);
		
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(DoubleWritable.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(DoubleWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("/root/out"));
		FileSystem fileSystem = FileSystem.get(conf);
		Path outPath = new Path("/out");
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath,true);
		}
		
		FileOutputFormat.setOutputPath(job, outPath);
		
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion?0:1);
		
	}

}
WordCount

Mapper

package yyy;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class WordMapper extends Mapper {

		private Text k=new Text();
		private IntWritable v =new IntWritable(1);
		
		protected void map(LongWritable key,Text value,
				Mapper.Context context)
				throws IOException,InterruptedException
				{
					String line=value.toString();
					String[] words=line.split(" ");
					
					for (String word:words)
					{
						k.set(word);
						context.write(k,v);
					}
				}
}

Reducer

package yyy;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordReduce extends Reducer {
	private IntWritable v= new IntWritable();
	protected void reduce(Text key,Iterable values,
			Reducer.Context context)
			throws IOException,InterruptedException{
		int sum=0;
		for (IntWritable count:values) {
			sum+=count.get();
		}
		v.set(sum);
		context.write(key, v);
	}

}

Driver

package yyy;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordDriver {
	@SuppressWarnings("deprecation")
	public static void main(String[] args) throws IllegalArgumentException,IOException,InterruptedException, Exception{
		Configuration configuration = new Configuration();
		configuration.set("fs.defaultFS","hdfs://192.168.197.23:9000");
		Job job=Job.getInstance(configuration);
//		System.setProperty("root", "Administrator");
		
		job.setJarByClass(WordDriver.class);
		job.setMapperClass(WordMapper.class);
		job.setReducerClass(WordReduce.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileSystem fs=FileSystem.get(configuration);
		if(fs.exists(new Path("/output"))) {
			fs.delete(new Path("/output"));
		}
		FileInputFormat.setInputPaths(job,new Path("/wordinput"));
		FileOutputFormat.setOutputPath(job, new Path("/wordoutput1"));
		
		boolean result = job.waitForCompletion(true);
		System.exit(result?0:1);
	}

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584904.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号