public class MovieClass {
public class MovieMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("::");
if (split.length >= 4) {
context.write(new LongWritable(Long.valueOf(split[1])), new DoubleWritable(Double.valueOf(split[2])));
}
}
}
public class MovieReducer extends Reducer {
@Override
protected void reduce(LongWritable key, Iterable value,
Reducer.Context context)
throws IOException, InterruptedException {
double sum = 0.0;
int num = 0;
for (DoubleWritable doubleWritable : value) {
sum += doubleWritable.get();
num++;
}
double avg = sum/num;
context.write(key, new DoubleWritable(avg));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "http://192.168.197.23:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(MovieClass.class);
job.setMapperClass(MovieMapper.class);
job.setReducerClass(MovieReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.setInputPaths(job, new Path("/root/out"));
FileSystem fileSystem = FileSystem.get(conf);
Path outPath = new Path("/out");
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job, outPath);
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:1);
}
}
WordCount
Mapper
package yyy; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordMapper extends Mapper{ private Text k=new Text(); private IntWritable v =new IntWritable(1); protected void map(LongWritable key,Text value, Mapper .Context context) throws IOException,InterruptedException { String line=value.toString(); String[] words=line.split(" "); for (String word:words) { k.set(word); context.write(k,v); } } }
Reducer
package yyy; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordReduce extends Reducer{ private IntWritable v= new IntWritable(); protected void reduce(Text key,Iterable values, Reducer .Context context) throws IOException,InterruptedException{ int sum=0; for (IntWritable count:values) { sum+=count.get(); } v.set(sum); context.write(key, v); } }
Driver
package yyy;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordDriver {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IllegalArgumentException,IOException,InterruptedException, Exception{
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.197.23:9000");
Job job=Job.getInstance(configuration);
// System.setProperty("root", "Administrator");
job.setJarByClass(WordDriver.class);
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileSystem fs=FileSystem.get(configuration);
if(fs.exists(new Path("/output"))) {
fs.delete(new Path("/output"));
}
FileInputFormat.setInputPaths(job,new Path("/wordinput"));
FileOutputFormat.setOutputPath(job, new Path("/wordoutput1"));
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}



