栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop之MapReduce04【客户端源码分析】,linux视频教程下载

Hadoop之MapReduce04【客户端源码分析】,linux视频教程下载

while (!isComplete()) {

try {

// 间隔判断是否执行完成

Thread.sleep(completionPollIntervalMillis);

} catch (InterruptedException ie) {

}

}

}

return isSuccessful();

}

3.2submit

进入submit方法查看

public void submit()

throws IOException, InterruptedException, ClassNotFoundException {

// 再次确认任务状态

ensureState(JobState.DEFINE);

// 默认使用new APIs

setUseNewAPI();

// 初始化cluster对象

connect();

// 根据初始化得到的cluster对象生成JobSubmitter对象

final JobSubmitter submitter =

getJobSubmitter(cluster.getFileSystem(), cluster.getClient());

//

status = ugi.doAs(new PrivilegedExceptionAction() {

public JobStatus run() throws IOException, InterruptedException,

ClassNotFoundException {

// 进入 submitJobInternal 方法查看

return submitter.submitJobInternal(Job.this, cluster);

}

});

//将job的状态设置为RUNNING

state = JobState.RUNNING;

LOG.info("The url to track the job: " + getTrackingURL());

}

3.3 submitJobInternal

JobStatus submitJobInternal(Job job, Cluster cluster)

throws ClassNotFoundException, InterruptedException, IOException {

//检查job的输出空间

checkSpecs(job);

Configuration conf = job.getConfiguration();

// 将MapReduce框架加入分布式缓存中

addMRframeworkToDistributedCache(conf);

// 初始化job的工作根目录并返回path路径

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

//configure the command line options correctly on the submitting dfs

InetAddress ip = InetAddress.getLocalHost();

if (ip != null) {

submitHostAddress = ip.getHostAddress();

submitHostName = ip.getHostName();

conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);

conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);

}

// 为job分配一个名字

JobID jobId = submitClient.getNewJobID();

job.setJobID(jobId);

// 获得job的提交路径,也就是在jobStagingArea目录下建一个以jobId为文件名的目录

Path submitJobDir = new Path(jobStagingArea, jobId.toString());

JobStatus status = null;

// 进行一系列的配置

try {

conf.set(MRJobConfig.USER_NAME,

UserGroupInformation.getCurrentUser().getShortUserName());

conf.set(“hadoop.http.filter.initializers”,

“org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer”);

conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());

LOG.debug("Configuring job " + jobId + " with " + submitJobDir

  • " as the submit dir");

// get delegation token for the dir

TokenCache.obtainTokensForNamenodes(job.getCredentials(),

new Path[] { submitJobDir }, conf);

populateTokenCache(conf, job.getCredentials());

// generate a secret to authenticate shuffle transfers

if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {

KeyGenerator keyGen;

try {

keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);

keyGen.init(SHUFFLE_KEY_LENGTH);

} catch (NoSuchAlgorithmException e) {

throw new IOException(“Error generating shuffle secret key”, e);

}

SecretKey shuffleKey = keyGen.generateKey();

TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),

job.getCredentials());

}

// 这个方法实现文件上传

copyAndConfigureFiles(job, submitJobDir);

Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// Create the splits for the job

LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

// 方法内部会根据我们之前的设置,选择使用new-api还是old-api分别进行分片操作

int maps = writeSplits(job, submitJobDir);

conf.setInt(MRJobConfig.NUM_MAPS, maps);

LOG.info(“number of splits:” + maps);

// write “queue admins of the queue to which job is being submitted”

// to job file.

String queue = conf.get(MRJobConfig.QUEUE_NAME,

JobConf.DEFAULT_QUEUE_NAME);

AccessControlList acl = submitClient.getQueueAdmins(queue);

conf.set(toFullPropertyName(queue,

QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

// removing jobtoken referrals before copying the jobconf to HDFS

// as the tasks don’t need this setting, actually they may break

// because of it if present as the referral will point

【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】

浏览器打开:qq.cn.hn/FTf 免费领取

to a

// different job.

TokenCache.cleanUpTokenReferral(conf);

if (conf.getBoolean(

MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,

MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {

// Add HDFS tracking ids

ArrayList trackingIds = new ArrayList();

for (Token t :

job.getCredentials().getAllTokens()) {

trackingIds.add(t.decodeIdentifier().getTrackingId());

}

conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,

trackingIds.toArray(new String[trackingIds.size()]));

}

// 提交规划文件 job.split wc.jar …

writeConf(conf, submitJobFile);

//

// Now, actually submit the job (using the submit name)

// 提交任务

printTokens(jobId, job.getCredentials());

status = submitClient.submitJob(

jobId, submitJobDir.toString(), job.getCredentials());

if (status != null) {

return status;

} else {

throw new IOException(“Could not launch job”);

}

} finally {

if (status == null) {

LOG.info("Cleaning up the staging area " + submitJobDir);

if (jtFs != null && submitJobDir != null)

jtFs.delete(submitJobDir, true);

}

}

}

3.4writeSplits

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,

Path jobSubmitDir) throws IOException,

InterruptedException, ClassNotFoundException {

JobConf jConf = (JobConf)job.getConfiguration();

int maps;

if (jConf.getUseNewMapper()) {

//进入

maps = writeNewSplits(job, jobSubmitDir);

} else {

maps = writeOldSplits(jConf, jobSubmitDir);

}

return maps;

}

3.5writeNewSplits

int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,

InterruptedException, ClassNotFoundException {

Configuration conf = job.getConfiguration();

// 根据我们设置的inputFormat.class通过反射获得inputFormat对象

InputFormat input =

ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

// 获取分片信息

List splits = input.getSplits(job);

T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest

// go first

Arrays.sort(array, new SplitComparator());

// 将分片的信息写入到jobSubmitDir --job.split文件中

JobSplitWriter.createSplitFiles(jobSubmitDir, conf,

jobSubmitDir.getFileSystem(conf), array);

return array.length;

}

3.6 getSplits

public List getSplits(JobContext job) throws IOException {

Stopwatch sw = new Stopwatch().start();

// 最小值

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

// 最大值

long maxSize = getMaxSplitSize(job);

// generate splits

List splits = new ArrayList();

List files = listStatus(job);

for (FileStatus file: files) {

Path path = file.getPath();

long length = file.getLen();

if (length != 0) {

BlockLocation[] blkLocations;

if (file instanceof LocatedFileStatus) {

blkLocations = ((LocatedFileStatus) file).getBlockLocations();

} else {

FileSystem fs = path.getFileSystem(job.getConfiguration());

blkLocations = fs.getFileBlockLocations(file, 0, length);

}

if (isSplitable(job, path)) {

// 获取block大小

long blockSize = file.getBlockSize();

// 获取splitSize大小

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

splits.add(makeSplit(path, length-bytesRemaining, splitSize,

blkLocations[blkIndex].getHosts(),

blkLocations[blkIndex].getCachedHosts()));

bytesRemaining -= splitSize;

}

if (bytesRemaining != 0) {

int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,

blkLocations[blkIndex].getHosts(),

blkLocations[blkIndex].getCachedHosts()));

}

} else { // not splitable

splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),

blkLocations[0].getCachedHosts()));

}

} else {

//Create empty hosts array for zero length files

splits.add(makeSplit(path, 0, length, new String[0]));

}

}

// Save the number of input files for metrics/loadgen

job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

sw.stop();

if (LOG.isDebugEnabled()) {

LOG.debug("Total # of splits generated by getSplits: " + splits.size()

  • ", Timetaken: " + sw.elapsedMillis());

}

return splits;

}

3.7computeSplitSize

protected long computeSplitSize(long blockSize, long minSize,

long maxSize) {

return Math.max(minSize, Math.min(maxSize, blockSize));

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/488199.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号