public class DataCleanMap extends Mapper {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
}
}
在map方法里,解析JSON数据,读取数据
String line = v1.toString();//获取每一行内容
JSonObject jsonObj = new JSonObject(line); //将字符串转换为JSON格式
String id = jsonObj.getString("uid"); //获取主播的id数据
int gold = jsonObj.getInt("gold");
int watchnumpv = jsonObj.getInt("watchnumpv");
int follower = jsonObj.getInt("follower");
int length = jsonObj.getInt("length");
过滤掉异常的数据(如<0的数据等),最后封装到上下文中
if (gold >= 0 && watchnumpv >= 0 && follower >= 0 && length >= 0) {
// 封装数据到Text中,最后写入context上下文中
Text k2 = new Text();
k2.set(id);
Text v2 = new Text();
v2.set(gold + "t" + watchnumpv + "t" + follower + "t" + length);
context.write(k2, v2);
}
(2)主类(入口类)
相当于一个程序的主函数,最先开始执行的地方
在java目录下新建一个DataCleanJob类,在主函数设置Job作业的参数信息
// 运行程序指令输入错误,直接退出程序
if (args.length != 2) {
System.exit(100);
}
Configuration conf = new Configuration(); //job需要的配置参数
Job job = Job.getInstance(conf); //创建一个job
job.setJarByClass(DataCleanJob.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定输出路径(只能是指定一个不存在的目录)
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定map相关代码
job.setMapperClass(DataCleanMap.class);
//指定K2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(Text.class);
//设置reduce的数量,0表示禁用reduce
job.setNumReduceTasks(0);
//提交作业job
job.waitForCompletion(true);