1、总体思路
(1)首先将要分析的csv文件对象price和id,定义成String类型,因为MapReduce的输入和输出都是k,v键值对的形式。
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
for (CsvBean value : values) {
context.write(key,value);
}
}
(2)所以我们这里将price封装成一个对象,将price的对象属性按照csv文件进行设置。
//4 封装到对象 outV.setId(id); outV.setAge(price); outK.set(price);
(3)封装了对象后,我们需要对定义输入和输出的类型,这里用的是重写序列化方法以及重写反序列化方法。
重写序列化方法:writeUTF方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(price);
}
重写反序列化方法:readUTF方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.price = dataInput.readUTF();
}
(4)把结果显示在文件中,重写 toString(),这里用 "," 隔开显示。
@Override
public String toString() {
return "," + id ;
}
(5)最后我们设置csv文件的输入输出路径(这里的路径设置可以改为自己放置csv文件的位置)
//6 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\IDEA\mapreduce\steam\input\steam.csv"));
FileOutputFormat.setOutputPath(job,new Path("D:\IDEA\mapreduce\steam\output"));
2、代码展示
(1)封装对象CsvBean
package com.gis507.test.CsvSplit;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CsvBean implements Writable{
private String id;
private String price;
public CsvBean() {
}
public CsvBean(String id, String age) {
this.id = id;
this.price = age;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAge() {
return price;
}
public void setAge(String age) {
this.price = age;
}
@Override
public String toString() {
return "," + id ;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.price = dataInput.readUTF();
}
}
(2)Mapper类,把id和price读进来
package com.gis507.test.CsvSplit; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class CsvSplitMapper extends Mapper{ private Text outK = new Text(); private CsvBean outV = new CsvBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1获取一行转为String String line = value.toString(); //2 按照逗号分割 String[] csvComments = line.split(","); //3 获取需要的值 String id = csvComments[0]; String price = csvComments[2]; //4 封装到对象 outV.setId(id); outV.setAge(price); outK.set(price); //5 写出 context.write(outK,outV); } }
(3)Reducer类
package com.gis507.test.CsvSplit; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CsvSplitReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (CsvBean value : values) { context.write(key,value); } } }
(4)定义输入输出函数CsvSplitDriver
package com.gis507.test.CsvSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CsvSplitDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联Driver类
job.setJarByClass(CsvSplitDriver.class);
//3 关联Mapper和Reducer类
job.setMapperClass(CsvSplitMapper.class);
job.setReducerClass(CsvSplitReducer.class);
//4 设置Map的输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CsvBean.class);
//5 设置最终的输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CsvBean.class);
//6 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\IDEA\mapreduce\steam\input\steam.csv"));
FileOutputFormat.setOutputPath(job,new Path("D:\IDEA\mapreduce\steam\output"));
//7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
3、查看输出结果
4、项目源代码Mapreduce对csv文件数据进行价格排序处理



