----------------------------------主程序入口----------------------------------
package com.demo01.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
public static void main(String[] args) throws Exception {
//这里执行完成,返回一个程序退出状态码 0成功
//这里设置configguration相当于给父类赋值了
int run = ToolRunner.run(new Configuration(),new JobMain(),args);
System.exit(run);
}
@Override
public int run(String[] strings) throws Exception {
//1.读取文件解析成value对
//第一个是configuration配置文件,第二个定义job的名字
Job job = Job.getInstance(super.getConf(),"XXX");
//设置程序入口类
job.setJarByClass(JobMain.class);
//设置job接收的的数据类型
job.setInputFormatClass(TextInputFormat.class);
//设置需要处理的文件
//hdfs集群下执行
// FileInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
//本地测试
FileInputFormat.addInputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop\3、大数据离线第三天\3、大数据离线第三天\wordcount\input"));
//2.自定义mapper类
job.setMapperClass(WordCountMapper.class);
//设置key2和value2的类
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//7.自定义reduce逻辑
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8.输出文件
//路径一定要不存在,存在就报错
// TextOutputFormat.setOutputPath(job,new Path("hdfs://node01/wordcountoutput"));
//本地测试
TextOutputFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop\3、大数据离线第三天\3、大数据离线第三天\wordcount\output"));
//提交任务到集群上
boolean b = job.waitForCompletion(true);
return b?0:1;
}
}
----------------------------------mapper程序----------------------------------
package com.demo01.wordcount; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //此处泛型hadoop对java基础类型进行了包装,加快网络传输, 4个参数代表 public class WordCountMapper extends Mapper{ //重写map方法:自定义k1 v1转换到k2 v2的方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //hive,sqoop,flume,hello String[] split = value.toString().split(","); //遍历k2和v2往下发送 for (String word : split) { Text k2 = new Text(word); LongWritable v2 = new LongWritable(1); context.write(k2,v2); } } }
----------------------------REDUCE程序--------------------------------------
package com.demo01.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; //k2,v2,k3,v3 public class WordCountReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int num = 0; for (LongWritable value : values) { //IntWritable这个类没有加方法,通过get()编程编程java类型 num += value.get(); } context.write(key,new LongWritable(num)); } }



