深入理解 Spark 2(二):Spark on YARN

总览

从 Spark 的角度看集群,集群的角色分为:

  • Driver
  • Executor

从 YARN 的角度看集群,集群的角色分为:

  • Client
  • ApplicationMaster
  • Container

Client 模式:

YARN Client Mode

Cluster 模式:

YARN Cluster Mode

Client 模式与 Cluster 模式的区别:在 Client 模式下 Spark Driver 在 Client 启动,在 Cluster 模式下 Spark Driver 在 ApplicationMaster 启动

Client

org.apache.spark.deploy.yarn.Client 类 Spark YARN 客户端,在 run 方法内,实现了向 YARN 集群提交应用:

this.appId = submitApplication()  

下面详细解析提交应用的过程:

第一步,初始化并启动 YarnClient:

yarnClient.init(hadoopConf)  
yarnClient.start()  

第二步,创建应用并获取应用 ID:

val newApp = yarnClient.createApplication()  
val newAppResponse = newApp.getNewApplicationResponse()  
appId = newAppResponse.getApplicationId()  

第三步,创建运行 ApplicationMaster Container 的 ContainerLaunchContext 上下文对象

应用运行依赖的环境变量:

val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)  
amContainer.setEnvironment(launchEnv.asJava)  

准备本地资源文件,上传到 HDFS:

val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)  
amContainer.setLocalResources(localResources.asJava)  

启动 ApplicationMaster 的 Shell 命令:

val commands = prefixEnv ++  
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
amContainer.setCommands(printableCommands.asJava)  

第四步,创建提交 ApllicationMaster 的 ApplicationSubmissionContext 上下文对象

应用的信息:

appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))  
appContext.setQueue(sparkConf.get(QUEUE_NAME))  
appContext.setAMContainerSpec(containerContext)  
appContext.setApplicationType("SPARK")  

分配的资源:

val capability = Records.newRecord(classOf[Resource])  
capability.setMemory(amMemory + amMemoryOverhead)  
capability.setVirtualCores(amCores)  
appContext.setResource(capability)  

最后,提交应用:

yarnClient.submitApplication(appContext)  

ApplicationMaster

org.apache.spark.deploy.yarn.ApplicationMaster AM 实现类

第一步,创建 RM 客户端:

private val client = doAsUser { new YarnRMClient() }  

YarnRMClient 内部封装了 org.apache.hadoop.yarn.client.api.AMRMClient

第二步,如果是 Cluster 模式则运行 Driver,如果是 Client 模式则运行 Executor Launcher:

if (isClusterMode) {  
  runDriver()
} else {
  runExecutorLauncher()
}

第三步,Cluster 模式启动 Driver,方法 startUserApplication

获取应用入口方法:

val mainMethod = userClassLoader.loadClass(args.userClass)  
      .getMethod("main", classOf[Array[String]])

在新线程中执行:

mainMethod.invoke(null, userArgs.toArray)  

第四步,注册 Driver 分配资源:

注册 Driver:

allocator = client.register(driverUrl,  
  driverRef,
  yarnConf,
  _sparkConf,
  uiAddress,
  historyAddress,
  securityMgr,
  localResources)

这里,真正对内部封装的 org.apache.hadoop.yarn.client.api.AMRMClient 进行了初始化:

amClient = AMRMClient.createAMRMClient()  
amClient.init(conf)  
amClient.start()  

分配资源:

allocator.allocateResources()  

返回 YarnAllocator 对象,内部封装了 org.apache.hadoop.yarn.client.api.AMRMClient 向 RM 申请资源启动 Executor 操作

参考

  • 《Hadoop 技术内幕:深入解析 YARN 架构设计与实现原理》,机械工业出版社,董西成