深入理解 Spark 2(一):提交应用

当在命令行下执行 spark-submit 提交任务到 YARN 上的时候,Spark 究竟做了什么???

接下来的深入理解 Spark 2 系列将从源码的角度,探索 Spark 2 内部的实现原理

源码基于当前最新版 Spark 2.3.0

spark-submit.sh

当在命令行下执行:

spark-submit --master yarn --deploy-mode cluster --class com.dyingbleed.spark.Application SparkApp.jar  

第一步,设置 $SPARK_HOME 环境变量,为 spark-submit 脚本所在目录的上一级

export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"  

第二步,调用 spark-class 提交任务

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"  

spark-class.sh

第一步,加载环境变量:

  • conf/spark-env.sh 脚本配置的环境变量
  • $SPARK_SCALA_VERSION 环境变量,使用 Scala 版本
  • $JAVA_HOME 环境变量

第二步,Spark 依赖 Jars 目录,为主目录下 jars 目录

SPARK_JARS_DIR="${SPARK_HOME}/jars"  

第三步,检测参数及环境变量并构建启动命令

执行 org.apache.spark.launcher.Main

对参数进行解析:

AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);  

构建启动命令:

List<String> cmd = builder.buildCommand(env);  

输出命令:

List<String> bashCmd = prepareBashCommand(cmd, env);  
for (String c : bashCmd) {  
    System.out.print(c);
    System.out.print('\0');
}

第四步,执行任务

执行 org.apache.spark.deploy.SparkSubmit

SparkSubmit

第一步,解析参数并准备提交环境

解析参数:

val appArgs = new SparkSubmitArguments(args)  

准备提交环境:

val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)  

其中:

  • childArgs 应用参数
  • childClasspath 依赖 Jar 路径
  • sparkConf Spark 配置,详情见类 rg.apache.spark.SparkConf
  • childMainClass 入口类,如果是 YARN Cluster 模式,为 org.apache.spark.deploy.yarn.YarnClusterApplication,如果是 Yarn Client 模式,为应用主类类名

第二步,执行主类 main 函数

runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)  

参数 verbose 的作用,官网如下解释:

If you are ever unclear where configuration options are coming from, you can print out fine-grained debugging information by running spark-submit with the --verbose option.

创建 ClassLoader:

val loader =  
  if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
    new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
  } else {
    new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
  }
Thread.currentThread.setContextClassLoader(loader)  

获取主类:

mainClass = Utils.classForName(childMainClass)  

创建 SparkApplication 实例:

app = mainClass.newInstance().asInstanceOf[SparkApplication] // YARN Cluster 模式  
app = new JavaMainApplication(mainClass) // YARN Client 模式  

启动 Spark 应用:

app.start(childArgs.toArray, sparkConf)