当前位置:网站首页>Detailed explanation of spark specification

Detailed explanation of spark specification

2022-06-25 11:21:00 pyiran

Why speculation

We all know ,Spark job in , One stage When to finish , Depending on stage Next last task The completion time of .task The completion time of is also affected by many factors , such as partition The distribution of ,executor Resource usage of ,host Operating state , Cluster network, etc . In many cases, it is caused by the operating environment task Running too slowly , Give Way task Running again can alleviate this problem , therefore Spark It supports speculation( speculation ) function . In this article, we will introduce in detail what is spark Of speculation.

Spark.Speculation

stay spark Of configuration in , About speculation The parameters of are as follows :

property namedefaultmeaning
spark.speculationfalse If set to "true", Would be right. tasks Execute the speculation mechanism . That is to say, in one stage Slow down tasks There will be a chance to be restarted
spark.speculation.interval100msSpark testing tasks The interval between speculative mechanisms
spark.speculation.multiplier1.5 One task The runtime is all task Several times the median running time of ( Threshold value ) Will be considered as task Reboot required
spark.speculation.quantile0.75 When one stage What percentage of tasks The speculation mechanism will not be enabled until the operation is completed

We noticed that , The speculation mechanism is based on a stage Next on , Different stage Under the task Will not affect each other , It is also aimed at the running task. When speculation execution is started ,spark Will get the first completed task Results and will task Mark as done .

Speculation Workflow

from spark About speculation Configuration parameters for , It is not difficult for us to judge spark The speculative workflow of .

 Insert picture description here

Spark Source code

TaskScheduler Start function of , stay sparkContext Called during initialization .

override def start() {
    
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
    
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleWithFixedDelay(new Runnable {
    
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
    
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

stay TaskSetManager in , Detection needs to be started speculation The mechanism task

/** * Check for tasks to be speculated and return true if there are any. This is called periodically * by the TaskScheduler. * */
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
    
    // Can't speculate if we only have one task, and no need to speculate if the task set is a
    // zombie.
    if (isZombie || numTasks == 1) {
    
      return false
    }
    var foundTasks = false
    val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
    logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)

    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
    
      val time = clock.getTimeMillis()
      val medianDuration = successfulTaskDurations.median
      val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
      // TODO: Threshold should also look at standard deviation of task durations and have a lower
      // bound based on that.
      logDebug("Task length threshold for speculation: " + threshold)
      for (tid <- runningTasksSet) {
    
        val info = taskInfos(tid)
        val index = info.index
        if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
          !speculatableTasks.contains(index)) {
    
          logInfo(
            "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
              .format(index, taskSet.id, info.host, threshold))
          speculatableTasks += index
          sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
          foundTasks = true
        }
      }
    }
    foundTasks
  }

DAGScheduler Chinese vs task Conduct re-launch, So this is taking advantage of event The mechanism goes on .

  /** * Called by the TaskSetManager when it decides a speculative task is needed. */
  def speculativeTaskSubmitted(task: Task[_]): Unit = {
    
    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
  }

Reference resources

https://spark.apache.org/docs/latest/configuration.html

原网站

版权声明
本文为[pyiran]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202200540009977.html