栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

在hadoop中读取文件作为单个记录

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

在hadoop中读取文件作为单个记录

看一下这种输入格式,这是一种在单个映射任务中读取多个文件的输入格式,传递给映射器的每条记录都会读取一个(未拆分)文件。WholeFileRecordReader负责将一个文件内容作为一个值发送。返回的键是NullWritable,值是每个文件整体的内容。现在,您可以使用它来运行mapreduce作业,并查看实际运行了多少个mapper并检查您得到的输出是否正确。

记录是根据WholeFileRecordReaders构造的。

    public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{        @Override        protected boolean isSplitable(JobContext context, Path file) { return false;        }    @Override    public RecordReader<NullWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException {        if (!(split instanceof CombineFileSplit)) {   throw new IllegalArgumentException("split must be a CombineFileSplit"); } return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class);    }    }

在上面可以使用WholeFileRecordReader,如下所示:

public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {    private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class);            private final Path mFileToRead;            private final long mFileLength;            private final Configuration mConf;            private boolean mProcessed;          //  private final Text mFileName;            private final Text mFileText;            public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,          Integer pathToProcess) {        mProcessed = false;        mFileToRead = fileSplit.getPath(pathToProcess);        mFileLength = fileSplit.getLength(pathToProcess);        mConf = context.getConfiguration();        assert 0 == fileSplit.getOffset(pathToProcess);        if (LOG.isDebugEnabled()) {          LOG.debug("FileToRead is: " + mFileToRead.toString());          LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths());          try { FileSystem fs = FileSystem.get(mConf); assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;          } catch (IOException ioe) { // oh well, I was just testing.          }        }    //    mFileName = new Text();        mFileText = new Text();      }            @Override      public void close() throws IOException {        mFileText.clear();      }            @Override      public NullWritable getCurrentKey() throws IOException, InterruptedException {        return NullWritable.get();      }            @Override      public Text getCurrentValue() throws IOException, InterruptedException {        return mFileText;      }            @Override      public float getProgress() throws IOException, InterruptedException {        return (mProcessed) ? (float) 1.0 : (float) 0.0;      }            @Override      public void initialize(InputSplit split, TaskAttemptContext context)          throws IOException, InterruptedException {        // no-op.      }            @Override      public boolean nextKeyValue() throws IOException, InterruptedException {        if (!mProcessed) {          if (mFileLength > (long) Integer.MAX_VALUE) { throw new IOException("File is longer than Integer.MAX_VALUE.");          }          byte[] contents = new byte[(int) mFileLength];          FileSystem fs = mFileToRead.getFileSystem(mConf);          FSDataInputStream in = null;          try { // Set the contents of this file. in = fs.open(mFileToRead); IOUtils.readFully(in, contents, 0, contents.length); mFileText.set(contents, 0, contents.length);          } finally { IOUtils.closeStream(in);          }          mProcessed = true;          return true;        }        return false;      }}

以下是您的驱动程序代码:-

public int run(String[] arg) throws Exception {    Configuration conf=getConf();    FileSystem fs = FileSystem.get(conf);    //estimate reducers    Job job = new Job(conf);    job.setJarByClass(WholeFileDriver.class);    job.setJobName("WholeFile");    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(Text.class);    job.setInputFormatClass(WholeFileInputFormat.class);    job.setMapperClass(WholeFileMapper.class);    job.setNumReduceTasks(0);    FileInputFormat.addInputPath(job, new Path(arg[0]));    Path output=new Path(arg[1]);    try {        fs.delete(output, true);    } catch (IOException e) {        LOG.warn("Failed to delete temporary path", e);    }    FileOutputFormat.setOutputPath(job, output);    boolean ret=job.waitForCompletion(true);    if(!ret){        throw new Exception("Job Failed");    }


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/446532.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号