import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount {
//map端继承hadoop的Mapper先把泛型填上
// 前面两个是进入map端的key和value的数据类型,
// 后面是从map端出去时候的数据类型
public static class WordCountMap extends Mapper{
//重写map方法
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//输出key和value
context.write(value,new LongWritable(1));
}
}
//reduce端与map端道理相同,不同的是进入的key和value数据类型是与map端输出的一样,
// 输出的数据类型根据具体要求
public static class WordCountReduce extends Reducer{
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long sum=0;
for (LongWritable value : values) {
sum=sum+value.get();
}
context.write(key,new LongWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//配置 通过job进行配置mapreduce
Job job = Job.getInstance();
//设置名字
job.setJobName("第一个mapreduce程序");
//map端所在类的位置
job.setMapperClass(WordCountMap.class);
//reduce端所在类的位置
job.setReducerClass(WordCountReduce.class);
//指定jar包
job.setJarByClass(WordCount.class);
//设置map端输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定reduce端输出数据
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定路径
Path path = new Path("/data/test.txt");
Path path1 = new Path("/output");
//如果不存在就将路径添加进去,否则删除然后再输出
FileSystem fileSystem = FileSystem.get(new Configuration());
if(fileSystem.exists(path1)){
fileSystem.delete(path1,true);
}
FileInputFormat.addInputPath(job, path);
FileOutputFormat.setOutputPath(job, path1);
job.waitForCompletion(true);
}
}
运行
hadoop jar 打包的jar包名 路径 (路径写从java开始的全路径)
注意:1: 要指定jar包所在类。2:数据要传输到指定的hdfs路径。3:要开启hdfs,在虚拟机上运行。
会进行mapreduce



