目的:能够自定义指定输出位置,而不局限于目前文件系统的目录;可以是数据库或者其他存储方式。
OutputFormat类为抽象类,其子类为FileOutputFormat也是抽象类,经常在Driver中通过静态方法setOutputPath指定输出路径。
- 默认情况下,输出格式为TextOutputFormat。
- 自定义OutputFormat步骤
- 自定义一个类继承FileOutputFormat;
- 改写RecordWriter,具体改写输出数据的方法write()。
需求:过滤输入的log日志,包含atguigu的网站输出到E:TestLoglog1.txt,不包含atguigu的网站输出到E:TestLoglog2.txt。
分析:
-
自定义一个类继承LogOutputFormat,重写getRecordWriter(TaskAttemptContext job)方法;但是这个方法的返回值为RecordWriter;
-
自定义LogRecordWriter并将其返回;
-
LogRecordWriter类需要继承RecordWriter
,并重写write方法和close方法; 这里可以模仿TextOutputFormat类。
-
我们将getRecordWriter(TaskAttemptContext job)中的形参放入new LogRecordWriter(job)中,使其在构造方法中完成指定输出流位置。详细过程:通过FileSystem的话需要conf,我们可以通过job.getConfiguration();
-
重写write,根据不同情况分别往不同的位置写出;
-
关闭资源。
Mapper:
package com.hpu.hadoop.outputformat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class OutMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } }
Reducer:
package com.hpu.hadoop.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OutReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (NullWritable value : values) { context.write(key,NullWritable.get()); } } }
LogOutPutFormat:
package com.hpu.hadoop.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogOutPutFormat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { LogRecordWriter lrw = new LogRecordWriter(job); return lrw; } }
LogRecordWriter:
package com.hpu.hadoop.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class LogRecordWriter extends RecordWriter{ private FSDataOutputStream fsDataOutputStreama; private FSDataOutputStream fsDataOutputStreamb; public LogRecordWriter(TaskAttemptContext job){ Configuration conf = job.getConfiguration(); try { FileSystem fileSystem = FileSystem.get(conf); fsDataOutputStreama = fileSystem.create(new Path("E:\Test\Log\log1.txt")); fsDataOutputStreamb = fileSystem.create(new Path("E:\Test\Log\log2.txt")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String s = key.toString(); if (s.contains("atguigu")){ fsDataOutputStreama.writeBytes(s+"n"); } else { fsDataOutputStreamb.writeBytes(s+"n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStreams(fsDataOutputStreamb,fsDataOutputStreama); } }
Driver:
package com.hpu.hadoop.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OutDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(OutDriver.class);
job.setMapperClass(OutMapper.class);
job.setReducerClass(OutReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义的outputformat
job.setOutputFormatClass(LogOutPutFormat.class);
FileInputFormat.setInputPaths(job,new Path("E:\Test\input\inputoutputformat"));
//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job,new Path("E:\Test\ll"));
job.waitForCompletion(true);
}
}



