深入理解 Spark(二):Spark on YARN

总览

从 Spark 的角度看集群,集群的角色分为:

  • Driver
  • Executor

从 YARN 的角度看集群,集群的角色分为:

  • Client
  • ApplicationMaster
  • Container

Client 模式:

YARN Client Mode

Cluster 模式:

YARN Cluster Mode

Client 模式与 Cluster 模式的区别:在 Client 模式下 Spark Driver 在 Client 启动,在 Cluster 模式下 Spark Driver 在 ApplicationMaster 启动

Client

org.apache.spark.deploy.yarn.Client 类 Spark YARN 客户端,在 run 方法内,实现了向 YARN 集群提交应用:

this.appId = submitApplication()  

下面详细解析提交应用的过程:

第一步,初始化并启动 YarnClient:

yarnClient.init(hadoopConf)  
yarnClient.start()  

第二步,创建应用并获取应用 ID:

val newApp = yarnClient.createApplication()  
val newAppResponse = newApp.getNewApplicationResponse()  
appId = newAppResponse.getApplicationId()  

第三步,建立运行 ApplicationMaster Container 的 ContainerLaunchContext

应用运行依赖的环境变量:

val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)  
amContainer.setEnvironment(launchEnv.asJava)  

准备本地资源文件,上传到 HDFS:

val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)  
amContainer.setLocalResources(localResources.asJava)  

启动 ApplicationMaster 的 Shell 命令:

val commands = prefixEnv ++  
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
amContainer.setCommands(printableCommands.asJava)  

Client 模式与 Cluster 模式下主类是不同的:

val amClass =  
  if (isClusterMode) {
    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
  } else {
    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
  }

第四步,建立提交 ApllicationMaster 的 ApplicationSubmissionContext

应用的信息:

appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))  
appContext.setQueue(sparkConf.get(QUEUE_NAME))  
appContext.setAMContainerSpec(containerContext)  
appContext.setApplicationType("SPARK")  

分配的资源:

val capability = Records.newRecord(classOf[Resource])  
capability.setMemory(amMemory + amMemoryOverhead)  
capability.setVirtualCores(amCores)  
appContext.setResource(capability)  

最后,提交应用:

yarnClient.submitApplication(appContext)  

ApplicationMaster

org.apache.spark.deploy.yarn.ApplicationMaster

org.apache.spark.deploy.yarn.ExecutorLauncher

Executor

TODO

参考

  • 《Hadoop 技术内幕:深入解析 YARN 架构设计与实现原理》,机械工业出版社,董西成