栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark+hadoop读取数据源码

Spark+hadoop读取数据源码

package com.jack.rdd.create;



package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.StringUtils;

import com.google.common.collect.Lists;


@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileInputFormat extends InputFormat {
    public static final String INPUT_DIR =
            "mapreduce.input.fileinputformat.inputdir";
    public static final String SPLIT_MAXSIZE =
            "mapreduce.input.fileinputformat.split.maxsize";
    public static final String SPLIT_MINSIZE =
            "mapreduce.input.fileinputformat.split.minsize";
    public static final String PATHFILTER_CLASS =
            "mapreduce.input.pathFilter.class";
    public static final String NUM_INPUT_FILES =
            "mapreduce.input.fileinputformat.numinputfiles";
    public static final String INPUT_DIR_RECURSIVE =
            "mapreduce.input.fileinputformat.input.dir.recursive";
    public static final String LIST_STATUS_NUM_THREADS =
            "mapreduce.input.fileinputformat.list-status.num-threads";
    public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;

    private static final Log LOG = LogFactory.getLog(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.class);

    private static final double SPLIT_SLOP = 1.1;   // 10% slop

    @Deprecated
    public static enum Counter {
        BYTES_READ
    }

    private static final PathFilter hiddenFileFilter = new PathFilter(){
        public boolean accept(Path p){
            String name = p.getName();
            return !name.startsWith("_") && !name.startsWith(".");
        }
    };

    
    private static class MultiPathFilter implements PathFilter {
        private List filters;

        public MultiPathFilter(List filters) {
            this.filters = filters;
        }

        public boolean accept(Path path) {
            for (PathFilter filter : filters) {
                if (!filter.accept(path)) {
                    return false;
                }
            }
            return true;
        }
    }

    
    public static void setInputDirRecursive(Job job,
                                            boolean inputDirRecursive) {
        job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
                inputDirRecursive);
    }

    
    public static boolean getInputDirRecursive(JobContext job) {
        return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
                false);
    }

    
    protected long getFormatMinSplitSize() {
        return 1;
    }

    
    protected boolean isSplitable(JobContext context, Path filename) {
        return true;
    }

    
    public static void setInputPathFilter(Job job,
                                          Class filter) {
        job.getConfiguration().setClass(PATHFILTER_CLASS, filter,
                PathFilter.class);
    }

    
    public static void setMinInputSplitSize(Job job,
                                            long size) {
        job.getConfiguration().setLong(SPLIT_MINSIZE, size);
    }

    
    public static long getMinSplitSize(JobContext job) {
        return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
    }

    
    public static void setMaxInputSplitSize(Job job,
                                            long size) {
        job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
    }

    
    public static long getMaxSplitSize(JobContext context) {
        return context.getConfiguration().getLong(SPLIT_MAXSIZE,
                Long.MAX_VALUE);
    }

    
    public static PathFilter getInputPathFilter(JobContext context) {
        Configuration conf = context.getConfiguration();
        Class filterClass = conf.getClass(PATHFILTER_CLASS, null,
                PathFilter.class);
        return (filterClass != null) ?
                (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
    }

    
    protected List listStatus(JobContext job
    ) throws IOException {
        Path[] dirs = getInputPaths(job);
        if (dirs.length == 0) {
            throw new IOException("No input paths specified in job");
        }

        // get tokens for all the required FileSystems..
        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
                job.getConfiguration());

        // Whether we need to recursive look into the directory structure
        boolean recursive = getInputDirRecursive(job);

        // creates a MultiPathFilter with the hiddenFileFilter and the
        // user provided one (if any).
        List filters = new ArrayList();
        filters.add(hiddenFileFilter);
        PathFilter jobFilter = getInputPathFilter(job);
        if (jobFilter != null) {
            filters.add(jobFilter);
        }
        PathFilter inputFilter = new org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter(filters);

        List result = null;

        int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
                DEFAULT_LIST_STATUS_NUM_THREADS);
        StopWatch sw = new StopWatch().start();
        if (numThreads == 1) {
            result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
        } else {
            Iterable locatedFiles = null;
            try {
                LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
                        job.getConfiguration(), dirs, recursive, inputFilter, true);
                locatedFiles = locatedFileStatusFetcher.getFileStatuses();
            } catch (InterruptedException e) {
                throw new IOException("Interrupted while getting file statuses");
            }
            result = Lists.newArrayList(locatedFiles);
        }

        sw.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Time taken to get FileStatuses: "
                    + sw.now(TimeUnit.MILLISECONDS));
        }
        LOG.info("Total input paths to process : " + result.size());
        return result;
    }

    private List singleThreadedListStatus(JobContext job, Path[] dirs,
                                                      PathFilter inputFilter, boolean recursive) throws IOException {
        List result = new ArrayList();
        List errors = new ArrayList();
        for (int i=0; i < dirs.length; ++i) {
            Path p = dirs[i];
            FileSystem fs = p.getFileSystem(job.getConfiguration());
            FileStatus[] matches = fs.globStatus(p, inputFilter);
            if (matches == null) {
                errors.add(new IOException("Input path does not exist: " + p));
            } else if (matches.length == 0) {
                errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
            } else {
                for (FileStatus globStat: matches) {
                    if (globStat.isDirectory()) {
                        RemoteIterator iter =
                                fs.listLocatedStatus(globStat.getPath());
                        while (iter.hasNext()) {
                            LocatedFileStatus stat = iter.next();
                            if (inputFilter.accept(stat.getPath())) {
                                if (recursive && stat.isDirectory()) {
                                    addInputPathRecursively(result, fs, stat.getPath(),
                                            inputFilter);
                                } else {
                                    result.add(stat);
                                }
                            }
                        }
                    } else {
                        result.add(globStat);
                    }
                }
            }
        }

        if (!errors.isEmpty()) {
            throw new InvalidInputException(errors);
        }
        return result;
    }

    
    protected void addInputPathRecursively(List result,
                                           FileSystem fs, Path path, PathFilter inputFilter)
            throws IOException {
        RemoteIterator iter = fs.listLocatedStatus(path);
        while (iter.hasNext()) {
            LocatedFileStatus stat = iter.next();
            if (inputFilter.accept(stat.getPath())) {
                if (stat.isDirectory()) {
                    addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
                } else {
                    result.add(stat);
                }
            }
        }
    }


    
    protected FileSplit makeSplit(Path file, long start, long length,
                                  String[] hosts) {
        return new FileSplit(file, start, length, hosts);
    }

    
    protected FileSplit makeSplit(Path file, long start, long length,
                                  String[] hosts, String[] inMemoryHosts) {
        return new FileSplit(file, start, length, hosts, inMemoryHosts);
    }

    
    public List getSplits(JobContext job) throws IOException {
        StopWatch sw = new StopWatch().start();
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

        // generate splits
        List splits = new ArrayList();
        List files = listStatus(job);
        for (FileStatus file: files) {
            Path path = file.getPath();
            long length = file.getLen();
            if (length != 0) {
                BlockLocation[] blkLocations;
                if (file instanceof LocatedFileStatus) {
                    blkLocations = ((LocatedFileStatus) file).getBlockLocations();
                } else {
                    FileSystem fs = path.getFileSystem(job.getConfiguration());
                    blkLocations = fs.getFileBlockLocations(file, 0, length);
                }
                if (isSplitable(job, path)) {
                    long blockSize = file.getBlockSize();
                    long splitSize = computeSplitSize(blockSize, minSize, maxSize);

                    long bytesRemaining = length;
                    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                blkLocations[blkIndex].getHosts(),
                                blkLocations[blkIndex].getCachedHosts()));
                        bytesRemaining -= splitSize;
                    }

                    if (bytesRemaining != 0) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                                blkLocations[blkIndex].getHosts(),
                                blkLocations[blkIndex].getCachedHosts()));
                    }
                } else { // not splitable
                    splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                            blkLocations[0].getCachedHosts()));
                }
            } else {
                //Create empty hosts array for zero length files
                splits.add(makeSplit(path, 0, length, new String[0]));
            }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Total # of splits generated by getSplits: " + splits.size()
                    + ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits;
    }

    protected long computeSplitSize(long blockSize, long minSize,
                                    long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
    }

    protected int getBlockIndex(BlockLocation[] blkLocations,
                                long offset) {
        for (int i = 0 ; i < blkLocations.length; i++) {
            // is the offset inside this block?
            if ((blkLocations[i].getOffset() <= offset) &&
                    (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
                return i;
            }
        }
        BlockLocation last = blkLocations[blkLocations.length -1];
        long fileLength = last.getOffset() + last.getLength() -1;
        throw new IllegalArgumentException("Offset " + offset +
                " is outside of file (0.." +
                fileLength + ")");
    }

    
    public static void setInputPaths(Job job,
                                     String commaSeparatedPaths
    ) throws IOException {
        setInputPaths(job, StringUtils.stringToPath(
                getPathStrings(commaSeparatedPaths)));
    }

    
    public static void addInputPaths(Job job,
                                     String commaSeparatedPaths
    ) throws IOException {
        for (String str : getPathStrings(commaSeparatedPaths)) {
            addInputPath(job, new Path(str));
        }
    }

    
    public static void setInputPaths(Job job,
                                     Path... inputPaths) throws IOException {
        Configuration conf = job.getConfiguration();
        Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
        for(int i = 1; i < inputPaths.length;i++) {
            str.append(StringUtils.COMMA_STR);
            path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
            str.append(StringUtils.escapeString(path.toString()));
        }
        conf.set(INPUT_DIR, str.toString());
    }

    
    public static void addInputPath(Job job,
                                    Path path) throws IOException {
        Configuration conf = job.getConfiguration();
        path = path.getFileSystem(conf).makeQualified(path);
        String dirStr = StringUtils.escapeString(path.toString());
        String dirs = conf.get(INPUT_DIR);
        conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
    }

    // This method escapes commas in the glob pattern of the given paths.
    private static String[] getPathStrings(String commaSeparatedPaths) {
        int length = commaSeparatedPaths.length();
        int curlyOpen = 0;
        int pathStart = 0;
        boolean globPattern = false;
        List pathStrings = new ArrayList();

        for (int i=0; i
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/630226.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号