看一下这种输入格式,这是一种在单个映射任务中读取多个文件的输入格式,传递给映射器的每条记录都会读取一个(未拆分)文件。WholeFileRecordReader负责将一个文件内容作为一个值发送。返回的键是NullWritable,值是每个文件整体的内容。现在,您可以使用它来运行mapreduce作业,并查看实际运行了多少个mapper并检查您得到的输出是否正确。
记录是根据WholeFileRecordReaders构造的。
public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{ @Override protected boolean isSplitable(JobContext context, Path file) { return false; } @Override public RecordReader<NullWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { if (!(split instanceof CombineFileSplit)) { throw new IllegalArgumentException("split must be a CombineFileSplit"); } return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class); } }在上面可以使用WholeFileRecordReader,如下所示:
public class WholeFileRecordReader extends RecordReader<NullWritable, Text> { private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class); private final Path mFileToRead; private final long mFileLength; private final Configuration mConf; private boolean mProcessed; // private final Text mFileName; private final Text mFileText; public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, Integer pathToProcess) { mProcessed = false; mFileToRead = fileSplit.getPath(pathToProcess); mFileLength = fileSplit.getLength(pathToProcess); mConf = context.getConfiguration(); assert 0 == fileSplit.getOffset(pathToProcess); if (LOG.isDebugEnabled()) { LOG.debug("FileToRead is: " + mFileToRead.toString()); LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths()); try { FileSystem fs = FileSystem.get(mConf); assert fs.getFileStatus(mFileToRead).getLen() == mFileLength; } catch (IOException ioe) { // oh well, I was just testing. } } // mFileName = new Text(); mFileText = new Text(); } @Override public void close() throws IOException { mFileText.clear(); } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return mFileText; } @Override public float getProgress() throws IOException, InterruptedException { return (mProcessed) ? (float) 1.0 : (float) 0.0; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // no-op. } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!mProcessed) { if (mFileLength > (long) Integer.MAX_VALUE) { throw new IOException("File is longer than Integer.MAX_VALUE."); } byte[] contents = new byte[(int) mFileLength]; FileSystem fs = mFileToRead.getFileSystem(mConf); FSDataInputStream in = null; try { // Set the contents of this file. in = fs.open(mFileToRead); IOUtils.readFully(in, contents, 0, contents.length); mFileText.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } mProcessed = true; return true; } return false; }}以下是您的驱动程序代码:-
public int run(String[] arg) throws Exception { Configuration conf=getConf(); FileSystem fs = FileSystem.get(conf); //estimate reducers Job job = new Job(conf); job.setJarByClass(WholeFileDriver.class); job.setJobName("WholeFile"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setMapperClass(WholeFileMapper.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(arg[0])); Path output=new Path(arg[1]); try { fs.delete(output, true); } catch (IOException e) { LOG.warn("Failed to delete temporary path", e); } FileOutputFormat.setOutputPath(job, output); boolean ret=job.waitForCompletion(true); if(!ret){ throw new Exception("Job Failed"); }


