编辑器:vscode
JDK版本:JDK1.8
项目管理器:maven 二、项目结构以及坐标依赖
项目结构:
坐标依赖:
三、数据结构org.apache.hadoop hadoop-hdfs 2.7.5 org.apache.hadoop hadoop-client 2.7.5 org.apache.hadoop hadoop-common 2.7.5 org.apache.hadoop hadoop-mapreduce-client-core 2.7.5 org.apache.hadoop hadoop-yarn-common 2.7.5
数据下载地址:
链接:https://pan.baidu.com/s/10h_3TL27zYO0_WRTTfn2CQ
提取码:8888
数据预览:
去掉首行索引保存为.csv文件,上传到hadoop存储即可,存储路径自定义,只需后续在代码中修改即可。
①MapReduce作业一
目标:job_counter: 统计每个球员的总的ACE数据
文件结构:
map_ace.java代码:
package job_counter; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class map_ace extends Mapper{ //map方法统计ACE球的个数 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.每行文本拆分,并获取值 String[] line=value.toString().split(","); String name=line[3]; String ace=line[15]; //2.写入上下文 Text text=new Text(); LongWritable longWritable=new LongWritable(); text.set(name); longWritable.set(Long.parseLong(ace)); context.write(text,longWritable); } }
reduce_ace.java代码:
package job_counter; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class reduce_ace extends Reducer{ //reduce方法统计总和 @Override protected void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { //遍历集合,将每个值累加 long point=0; for (LongWritable value : values) { point=point+value.get(); } //将k3,v3写入上下文 context.write(key, new LongWritable(point)); } }
main_ace.java代码:
package job_counter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class main_ace extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
//1.创建一个job任务对象
Configuration conf=super.getConf();
Job job=Job.getInstance(conf,"main_ace");
//打包运行时函数
job.setJarByClass(main_ace.class);
//2.配置job对象
//第一步:指定文件的读取方式和路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.96.138:9000/user/hadoop/job_data"));
//第二步,指定map阶段的处理方式
job.setMapperClass(map_ace.class);
//设置map阶段k2类型
job.setMapOutputKeyClass(Text.class);
//设置map阶段v2类型
job.setMapOutputValueClass(LongWritable.class);
//shuffe使用默认
//第七步指定reduce阶段的处理方式和数据类型
job.setReducerClass(reduce_ace.class);
//k3类型
job.setOutputKeyClass(Text.class);
//v3类型
job.setOutputValueClass(LongWritable.class);
//判断文件路径是否存在
Path output= new Path("hdfs://192.168.96.138:9000/user/hadoop/counter_out_ace");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
//设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, output);
//等待任务结束
boolean b1=job.waitForCompletion(true);
return b1 ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration=new Configuration();
//start job
int run=ToolRunner.run(configuration, new main_ace(), args);
System.exit(run);
}
}
②MapReduce作业二
目标: job_class: 对%DF的球员根据四分位数分成四组
文件结构:
map_class.java代码:
package job_class; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class map_class extends Mapper{ ///对每个运动员进行四分位分类操作 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //写入上下文 context.write(value,NullWritable.get()); } }
part_class.java代码:
package job_class; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class part_class extends Partitioner{ @Override public int getPartition(Text text, NullWritable nullWritable, int i) { // TODO Auto-generated method stub //1,:拆分文本,获得字段 String[] split=text.toString().split(","); String df=split[18];//%DF的坐标 double low = 0.111111111111110; double zhong=0.181818181818182; double hig=0.300000000000000; //判断df与四分位数的关系 if(Double.parseDouble(df) low & Double.parseDouble(df)<=zhong){//大于25%小于50% return 2; } else if(Double.parseDouble(df)>zhong & Double.parseDouble(df)<=hig){//大于50%小于75% return 1; } else{//大于75% return 0; } } }
reduce_class.java代码:
package job_class; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class reduce_class extends Reducer{ //reduce方法不做处理,直接输出 @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
main_class.java代码:
package job_class;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class main_class extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
// 1.创建一个job任务对象
Configuration conf = super.getConf();
Job job = Job.getInstance(conf, "main_class");
// 打包运行时函数
job.setJarByClass(main_class.class);
// 2.配置job对象
// 第一步:指定文件的读取方式和路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://192.168.96.138:9000/user/hadoop/job_data"));
// 第二步,指定map阶段的处理方式
job.setMapperClass(map_class.class);
// 设置map阶段k2类型
job.setMapOutputKeyClass(Text.class);
// 设置map阶段v2类型
job.setMapOutputValueClass(NullWritable.class);
// shuffe使用part_class
job.setPartitionerClass(part_class.class);
//设置4个任务
job.setNumReduceTasks(4);
// 第七步指定reduce阶段的处理方式和数据类型
job.setReducerClass(reduce_class.class);
// k3类型
job.setOutputKeyClass(Text.class);
// v3类型
job.setOutputValueClass(NullWritable.class);
// 判断文件路径是否存在
Path output = new Path("hdfs://192.168.96.138:9000/user/hadoop/class_out_%DF");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
// 设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, output);
// 等待任务结束
boolean b1 = job.waitForCompletion(true);
return b1 ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// start job
int run = ToolRunner.run(configuration, new main_class(), args);
System.exit(run);
}
}



