package com.huni.mapreduce.WordCount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //用来接收原始数据中每行的数据,并且转换成字符串类型 String line = value.toString(); //通过空格来切割每行数据,用来确定每个单词 String[] words = line.split(" "); //将单词遍历输出到mapper的缓冲区中 for(String word:words){ //再次又将Java中的String类型转换成Hadoop中的Text context.write(new Text(word), new IntWritable(1)); } } }
package com.huni.mapreduce.WordCount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //Iterable values是将相同的key的value存储在一个Iterable类型中,相当于(key-list) int count = 0; // 1 汇总各个key的个数values(key, value-list) for (IntWritable value : values) { count+=value.get(); } // 2输出该key的总次数 context.write(key, new IntWritable(count)); } }
package com.huni.mapreduce.WordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountJob {
public static void main(String[] args) throws Exception {
//1.获取配置信息,得到Job对象
Configuration configuration =new Configuration();
Job job = Job.getInstance(configuration);
//2.设置加载jar的位置
job.setJarByClass(WordCountJob.class);
//3.设置mapper和reduce的class类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
//4.设置输出mapper的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//自定义设置分区
job.setPartitionerClass(WordCountPartition.class);//自定义为自己重写的partition类
job.setNumReduceTasks(2);//设置开启reduceTask的数量,要与分区数量一致
//设置文件切割机制,当达到512k时才会去执行切割,最小512k,最大10m
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job,10240);
CombineTextInputFormat.setMinInputSplitSize(job,512);
//6.设置输入数据和输出数据的的路径
FileInputFormat.setInputPaths(job, new Path("d://ap_event"));
FileOutputFormat.setOutputPath(job, new Path("d://out"));
//FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/input/install.log"));
// FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/outt"));
//7.提交
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
2.本地调试的时候需要下载对应的Hadoop安装包(我用的是Hadoop-2.9.2)
3.配置环境变量
4.配置Path环境变量
5.将winutils.exe 下载后放到安装包hadoop-2.9.2bin下,安装包是没有自带的
6.就可以直接运行代码了
ps:所需软件下载地址:
链接:https://pan.baidu.com/s/1jYHmCIZGo6H8vCdpXuE1cQ
提取码:6666



