• 注册
  • 后端开发博客 后端开发博客 关注:0 内容:702

    Spark 提交任务 源码解析 (一)

  • 查看作者
  • 打赏作者
  • 当前位置: 职业司 > 后端开发 > 后端开发博客 > 正文
    • 后端开发博客
    • 这是我参与更文挑战的第10天,活动详情查看:更文挑战

      最令人头秃的就是看源码

      Spark 提交任务 源码解析 (一)

      环境

      • 本次使用的spark版本是 3.0.0
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>3.0.0</version>
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-yarn_2.12</artifactId>
      <version>3.0.0</version>
      <scope>provided</scope>
      </dependency>
      复制代码

      1.这是我们提交任务的脚本

      bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master yarn \
      --deploy-mode client \
      ./examples/jars/spark-examples_2.12-3.0.0.jar
      复制代码

      2.咱们看下 spark-submit 脚本代码

      if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
      fi
      # disable randomized hash for string in Python 3.3+
      export PYTHONHASHSEED=0
      # 重点代码 
      exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
      复制代码

      可以看出 脚本中执行了 spark-class 脚本 ,参数为:org.apache.spark.deploy.SparkSubmit "$@"

      3.继续跟踪 spark-class 脚本代码

      # 下方是部分源码
      # 找到最后的代码 看到执行了CMD的值
      CMD=("${CMD[@]:0:$LAST}")
      exec "${CMD[@]}"
      # 接下来找到CMD是那来的?
      # 下面代码是组装参数
      CMD=()
      DELIM=$'\n'
      CMD_START_FLAG="false"
      while IFS= read -d "$DELIM" -r ARG; do
      if [ "$CMD_START_FLAG" == "true" ]; then
      CMD+=("$ARG")
      else
      if [ "$ARG" == $'\0' ]; then
      # After NULL character is consumed, change the delimiter and consume command string.
      DELIM=''
      CMD_START_FLAG="true"
      elif [ "$ARG" != "" ]; then
      echo "$ARG"
      fi
      fi
      done < <(build_command "$@")
      # 这里调用了 build_command 方法
      # -Xmx128m 是不是很熟悉,这里拼接成了一个Java启动命令
      # 通过运行这个类 org.apache.spark.launcher.Main 拼接好启动命令
      build_command() {
      "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
      printf "%d\0" $?
      }
      # 拼接完启动命令后由 exec "${CMD[@]}" 来执行
      复制代码

      org.apache.spark.launcher.Main 一大堆骚操作之后,会生成启动命令,命令会运行 org.apache.spark.deploy.SparkSubmit

      Spark 提交任务 源码解析 (一)

      4.接下来看 org.apache.spark.deploy.SparkSubmit

      • 我们先找到 main 方法
        override def main(args: Array[String]): Unit = {
      //这里声明了一个匿名内部类
      val submit = new SparkSubmit() {
      .......
      }
      //调用匿名内部类方法
      submit.doSubmit(args)
      }
      复制代码
      • 接下来看 SparkSubmit 匿名内部类 里做了什么
        • 重写了 parseArguments 在调用父类doSubmit后,会被调用
        • doSubmit被调用后会调用父类的doSubmit
      val submit = new SparkSubmit() {
      self =>
      //这里重写了 parseArguments 在调用父类doSubmit后,会被调用
      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
      new SparkSubmitArguments(args) {
      override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
      override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
      override protected def logError(msg: => String): Unit = self.logError(msg)
      }
      }
      override protected def logInfo(msg: => String): Unit = printMessage(msg)
      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
      override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
      //doSubmit被调用后会调用父类的doSubmit
      override def doSubmit(args: Array[String]): Unit = {
      try {
      super.doSubmit(args)
      } catch {
      case e: SparkUserAppException =>
      exitFn(e.exitCode)
      }
      }
      }
      复制代码
        def doSubmit(args: Array[String]): Unit = {
      val uninitLog = initializeLogIfNecessary(true, silent = true)
      // parseArguments 调用了上方 匿名内部类 重写的方法
      val appArgs = parseArguments(args)
      if (appArgs.verbose) {
      logInfo(appArgs.toString)
      }
      appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
      }
      }
      复制代码
      • parseArguments 方法做了什么尼?它创建了 SparkSubmitArguments 用来解析参数
      • 看看SparkSubmitArguments关键源码
      // 由于scala会把类中所有代码都走一便,有几百行,就不一一逼逼了
      // 直接找到108行 
      parse(args.asJava)
      //这方法主要是解析参数  类如 --class --master 啥的
      //其中 handle 是重点 ,其他都是解析字符串啥的
      protected def parse(args: util.List[String]): Unit = {
      val eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)")
      var idx = 0
      idx = 0
      while ( {
      idx < args.size
      }) {
      var arg = args.get(idx)
      var value = null
      val m = eqSeparatedOpt.matcher(arg)
      if (m.matches) {
      arg = m.group(1)
      value = m.group(2)
      }
      var name = findCliOption(arg, opts)
      if (name != null) {
      if (value == null) {
      if (idx == args.size - 1) throw new IllegalArgumentException(String.format("Missing argument for option '%s'.", arg))
      idx += 1
      value = args.get(idx)
      }
      // 重点方法 handle 
      if (!handle(name, value)) break
      continue
      }
      name = findCliOption(arg, switches)
      if (name != null) {
      if (!handle(name, null)) break
      continue
      }
      if (!handleUnknown(arg)) break
      idx += 1
      }
      if (idx < args.size) idx += 1
      handleExtraArgs(args.subList(idx, args.size))
      }
      //handle方法是父类定义的 在 `SparkSubmitArguments` 重写
      //父类代码
      protected boolean handle(String opt, String value) {
      //这种直接抛异常的,基本都是需要子类重写,直接找子类就行
      throw new UnsupportedOperationException();
      }
      //子类代码 在 `SparkSubmitArguments`中
      override protected def handle(opt: String, value: String): Boolean = {
      opt match {
      case NAME =>
      name = value
      //--master
      case MASTER =>
      master = value
      //--class
      case CLASS =>
      mainClass = value
      case DEPLOY_MODE =>
      if (value != "client" && value != "cluster") {
      error("--deploy-mode must be either \"client\" or \"cluster\"")
      }
      deployMode = value
      //此处省略一些代码,很多参数,具体可自行查看
      case _ =>
      error(s"Unexpected argument '$opt'.")
      }
      action != SparkSubmitAction.PRINT_VERSION
      }
      复制代码

      Spark 提交任务 源码解析 (一)

      • 接下来我们继续看没有执行完的 doSubmit
        def doSubmit(args: Array[String]): Unit = {
      val uninitLog = initializeLogIfNecessary(true, silent = true)
      // 上面讲述了 parseArguments 都做了些啥
      val appArgs = parseArguments(args)
      if (appArgs.verbose) {
      logInfo(appArgs.toString)
      }
      // 接下来继续
      // 这里用的了模式匹配
      // 默认情况下啥 SUBMIT (为啥默认SUBMIT,去 SparkSubmitArguments 看,在227行: action = Option(action).getOrElse(SUBMIT))
      // 可以穿参数 指定
      appArgs.action match {
      //接下来走 submit方法
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
      }
      }
      复制代码

      5.submit(appArgs, uninitLog)

        private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
      def doRunMain(): Unit = {
      if (args.proxyUser != null) {
      //我们参数没指定,所以走else,代码省略。。。可自行查看
      } else {
      runMain(args, uninitLog)
      }
      }
      if (args.isStandaloneCluster && args.useRest) {
      //Standalone提交方式
      //代码省略。。。
      } else {
      //我们这里使用yarn 所以走else
      //这里会调用上面的doRunMain
      doRunMain()
      }
      }
      复制代码

      进入runMain 方法,只保留关键代码,全部自己行查看

      private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
      //执行完下面这行代码得到 childMainClass="org.apache.spark.deploy.yarn.YarnClusterApplication"
      val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
      ......
      //这里是获取spark运行环境,用来给yarn发送用
      val loader = getSubmitClassLoader(sparkConf)
      for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
      }
      ......
      //这里拿到 org.apache.spark.deploy.yarn.YarnClusterApplication 的 class
      var mainClass: Class[_] = null
      try {
      mainClass = Utils.classForName(childMainClass)
      } catch {
      ......
      }
      //这里判断 mainClass 是否继承了 SparkApplication
      //YarnClusterApplication 继承了 SparkApplication
      val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      //所以会走这里 创建YarnClusterApplication对象
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
      } else {
      new JavaMainApplication(mainClass)
      }
      .........
      try {
      //启动
      app.start(childArgs.toArray, sparkConf)
      } catch {
      case t: Throwable =>
      throw findCause(t)
      }
      }
      //这方法几百行省略了一些
      private[deploy] def prepareSubmitEnvironment(
      args: SparkSubmitArguments,
      conf: Option[HadoopConfiguration] = None)
      : (Seq[String], Seq[String], SparkConf, String) = {
      .......
      //重点 关注 YARN_CLUSTER_SUBMIT_CLASS
      //private[deploy] val YARN_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.yarn.YarnClusterApplication"
      // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
      if (isYarnCluster) {
      //取 常量 YARN_CLUSTER_SUBMIT_CLASS
      //可得 childMainClass="org.apache.spark.deploy.yarn.YarnClusterApplication"
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
      ......
      }
      .......
      (childArgs, childClasspath, sparkConf, childMainClass)
      }
      复制代码

      6. 接下来 YarnClusterApplicationstart 方法

      肝不动了,等第二章.....很快

      Spark 提交任务 源码解析 (一)

      请登录之后再进行评论

      登录

      手机阅读天地(APP)

      • 微信公众号
      • 微信小程序
      • 安卓APP
      手机浏览,惊喜多多
      匿名树洞,说我想说!
      问答悬赏,VIP可见!
      密码可见,回复可见!
      即时聊天、群聊互动!
      宠物孵化,赠送礼物!
      动态像框,专属头衔!
      挑战/抽奖,金币送不停!
      赶紧体会下,不会让你失望!
    • 实时动态
    • 签到
    • 做任务
    • 发表内容
    • 偏好设置
    • 到底部
    • 帖子间隔 侧栏位置:
    • 还没有账号?点这里立即注册