Spark on Yarn
client
Cluster
本质区别,driver位置不同
1)有哪些不同得进程?
2)分别有什么作用?
3)Spark作业执行流程是什么样的
跑yarn有
--master yarn CoarseGrainedExecutorBackend 默认executor有两个 CoarseGrainedExecutorBackend SparkSubmit ApplicationMaster
跑Client有
--master client CoarseGrainedExecutorBackend 默认executor有两个 CoarseGrainedExecutorBackend SparkSubmit ExecutorLauncher
在起来之后,client不能杀死sparksubmit, cluster可以杀死sparksubmit
Spark源码流程SparkSubmit.main{
调用: doSubmit
val submit = new SparkSubmit()
submit.doSubmit(args) {
val appArgs = parseArguments(args) {
mergeDefalutSparkProperties() // 使用default得参数
ignoreNonSparkProperties()
loadEnvironmentArguments(){
action = Option(action).getOrElse(SUBMIT)
}
}
submit(appArgs, uninitLog){
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitElements{
if(deployMode == CLIENT){ // 如果选用client模式
childMainClass = args.mainClass // 用户指定 类
}
if(deployMode == CLUSTER){ // 如果选用cluster模式
childMainClass = org.apache.spark.deploy.yarn.YarnClusterApplication // 固定类
}
}
var mainClass == Utils.classForName(childMainClass) // 使用主main
val app : SparkApplication
app.start(childArgs.toArray, SparkConf){
new Client(new ClientArguments(args),conf).run(){
this.appId = submitApplication() // 提交应用程序, log(: requesting a new application from cluster with 1 nodeManagers) ==== 》 获取yarn得链接状态,
val containerContext = createContainerLaunchContext(newAppResponse) // client.scala
{
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark,deploy.yarn.applicationMaster").getName // cluset模式是applicationmaster
}
else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName // client是executorLauncher
}
}
val appcontext = createApplicationSubmissionContext(newApp, containerContext) // client.scala
}
}
}
}
SparkSubmitArgument ==> 就Submit提交解析得参数包装
参数转Java
applicationMaster
ApplicationMaster.main{
master = new Applicationmaster(am)
master.run()
runImpl(){
if(isClusterMode){
runDriver(){
userClassTread = strartUserApplication(){
val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main",classOf[Array[String]])
mainMethod.invoke(null, userArgs.toArray)
}
val sc = ThreadUtils.awaitResult(SparkContextPromis.function)
registerAm(host,port,userConf,sc.ui.map(_.webUrl))
createAllocator(driverRef, userConf){
allocator.allocateResources(){
handleAllocatoedContainers(allocatedContainers.asScala){
runAllocatedContainers(containersToUse){
//启动container
for(container <- containersToUse){ // 两个container 启动
launcherPool.execute(Thread).run(){
val commands = prepareCommand(){
org.apache.spark.executor.CoarseGrainedExecutorBackend
}
nmClient.startContainer(container.get, ctx)
}
}
}
}
}else{
runExecutorLauncher()
}
}
}
SparkContext
SparkCongtext{
private var SchedulerBackend
private var _taskSchedule}



