当前位置:网站首页>Processing source code of spark executor execution results

Processing source code of spark executor execution results

2022-06-22 16:57:00 ZH519080

from 1.6 after ,Driver Of BlockManagerMaster And BlockManager Communication between is no longer used AkkaUtil It is RpcEndpoint

Spark There are many executors in the cluster that execute , It takes a lot of Executor,CoarseGrainedExecutorBackend yes Executor Process ,Executor need CoarseGrainedExecutorBackend Maintain and manage .CoarseGrainedExecutorBackend When starting, you need to send RegisterExecutor towards Driver register , The registration content is RegisterExecutor.RegisterExecutor It's a case class, Source code is as follows :

//  towards Driver Registered Executor Information 
case class RegisterExecutor(executorId: String,executorRef: RpcEndpointRef,
    hostPort: String,cores: Int,logUrls: Map[String, String])
  extends CoarseGrainedClusterMessage

RegisterExecutor The registration information of is sent by the class CoarseGrainedExecutorBackend Of onStart Method .

override def onStart() {
  rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    driver = Some(ref)
    ref.ask[RegisterExecutorResponse](
      RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
  }(ThreadUtils.sameThread).onComplete {
    case Success(msg) => Utils.tryLogNonFatalError {
      Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
    }
    case Failure(e) => {
      logError(s"Cannot register with driver: $driverUrl", e)
      System.exit(1)
    }
  }(ThreadUtils.sameThread)
}

CoarseGrainedExecutorBackend Start up to Driver send out RegisterExecutor Message to register ;Driver received RegisterExecutor news , stay Executor After successful registration, a message is returned RegisterExecutor to CoarseGrainedExecutorBackend. Registered here Executor And really working Executor It doesn't matter , Actually, registration is RegisterExecutorBackend, Can be RegisterExecutor Understood as a RegisterExecutorBackend.

Be careful :1、CoarseGrainedExecutorBackend yes Executor Name of the process in which it runs ,CoarseGrainedExecutorBackend It will not complete the calculation of the task itself .2、Executor Is the object that is processing the task ,Executor The internal is completed through thread pool Task Of calculation .3、CoarseGrainedExecutorBackend And Executor It's one-to-one .4、CoarseGrainedExecutorBackend Is a message communicator , Acceptable Driver Information 、 Send to Driver Information, etc , Inherit and ThreadSafeRPCEndpoint.

CoarseGrainedExecutorBackend hold RegisterExecutor Message sent to Driver, among Driver stay SparkDeploySchedulerBackend Realization ( stay Spark-2.0 after SparkDeploySchedulerBackend Renamed StandaloneSchedulerBackend).SparkDeploySchedulerBackend Inherited from CoarseGrainedSchedulerBackend,start Start on startup AppClient(Spark-2.0 after ,AppClient Renamed StandaloneAppClient),SparkDeploySchedulerBackend Of start Method source code :

override def start() {  super.start()  // call CoarseGrainedSchedulerBackend Of start Method   launcherBackend.connect()  // The endpoint for executors to talk to us  val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,    RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),    CoarseGrainedSchedulerBackend.ENDPOINT_NAME)  val args = Seq(    "--driver-url", driverUrl,    "--executor-id", "{
   {EXECUTOR_ID}}",    "--hostname", "{
   {HOSTNAME}}",    "--cores", "{
   {CORES}}",    "--app-id", "{
   {APP_ID}}",    "--worker-url", "{
   {WORKER_URL}}")  val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")    .map(Utils.splitCommandString).getOrElse(Seq.empty)  val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")    .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)  val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")    .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)  val testingClassPath =    if (sys.props.contains("spark.testing")) {      sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq    } else {Nil}  //  Start by sending the registration information Executors

  val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
  val javaOpts = sparkJavaOpts ++ extraJavaOpts
  val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
    args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
  val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
  val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
    command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
  client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  client.start() // start-up Driver
  launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  waitForRegistration()
  launcherBackend.setState(SparkAppHandle.State.RUNNING)
}

stay Driver There are two important things in the process Endpoint:

  1. ClientEndpoint: Responsible for providing Master Register the current program , yes AppClient Internal member classes

  2. DriverEndpoint: It is the drive for the whole program to run , Received RegisterExecutor The message is completed in Driver Registration on , yes CoarseGrainedSchedulerBackend Internal member classes .

Eexecutor Of RegisterExecutor The registration message is submitted to DriverEndPoint, adopt DriverEndPoint Write data to CoarseGrainedSchedulerBackend The data structure inside executorMapData, the CoarseGrainedSchedulerBackend Get all the... Allocated by the current program ExecutorBackend process , And in each ExecutorBackend In the instance , adopt Executor Object is responsible for the execution of specific tasks .Executor And CoarseGrainedSchedulerBackend Between RegisterExecutor Messages are received and sent through receiveAndReply Method ,receiveAndReply Method is very important .receiveAndReply Methods include RegisterExecutor Registration process :

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
    if (executorDataMap.contains(executorId)) { //RegisterExecutor Registration information 
      context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    } else {
      // address Not empty . be executorRef.address As executorAddress
      val executorAddress = if (executorRef.address != null) {
          executorRef.address
        } else { // If it is empty , Use sender Of Address As executorAddress
          context.senderAddress
        }
      addressToExecutorId(executorAddress) = executorId
      totalCoreCount.addAndGet(cores)
      totalRegisteredExecutors.addAndGet(1)
      val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
        cores, cores, logUrls)
      CoarseGrainedSchedulerBackend.this.synchronized {
        executorDataMap.put(executorId, data)
        if (numPendingExecutors > 0) {
          numPendingExecutors -= 1}}
      context.reply(RegisteredExecutor(executorAddress.host))
      listenerBus.post(
        SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
      makeOffers()
    }
  case StopDriver =>
    context.reply(true)
    stop()
  case StopExecutors =>
    logInfo("Asking each executor to shut down")
    for ((_, executorData) <- executorDataMap) {
      executorData.executorEndpoint.send(StopExecutor)
    }
    context.reply(true)
  case RemoveExecutor(executorId, reason) =>
    removeExecutor(executorId, reason)
    context.reply(true)
  case RetrieveSparkProps =>
    context.reply(sparkProperties)
}

from CoarseGrainedSchedulerBackend Of receiveAndReply The method is known RegisterExecutor The execution of the message , namely Executor The registration process :

  1. Judge executorDataMap Does it include executorId, If the message of sending registration failure has been included RegisterExecutorFailed, Because there is already a repetition executorId Of Executor Running .

  2. Conduct Executor Registration of , obtain executorAddress

  3. Definition 3 Data structures :addressToExecutorId(DriverEndPoint Data structure of , contain RPC Address hostname and port are the same as ExecutorId Correspondence of )、totalCoreCount( The total number of cores in the cluster )、totalRegisteredExecutors( Currently registered Executors total . Both are CoarseGrainedSchedulerBackend data structure )

  4. Create a ExecutorData, extract executorRef、executorRef.address、hostname、cores Etc .

  5. Through code blocks CoarseGrainedSchedulerBackend.this.synchronized: Multiple in the cluster Executor towards Driver register , Prevent write conflicts , Designed as a synchronized code block .

  6. executor.send(RegisterExecutor) Send a message RegisterExecutor to sender,sender yes CoarseGrainedExecutorBackend, and CoarseGrainedExecutorBackend received RegisterExecutor After the news , Created Executor. and Executor Is responsible for the real Task Calculated .

  7. override def receive: PartialFunction[Any, Unit] = {  case RegisteredExecutor(hostname) =>    logInfo("Successfully registered with driver")    executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)  case RegisterExecutorFailed(message) =>    logError("Slave registration failed: " + message)    System.exit(1)  case LaunchTask(data) =>    if (executor == null) {      logError("Received LaunchTask command but executor was null")      System.exit(1)    } else {      val taskDesc = ser.deserialize[TaskDescription](data.value)      logInfo("Got assigned task " + taskDesc.taskId)      executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,        taskDesc.name, taskDesc.serializedTask)    }
    ......................

    Created threadPool Multi thread concurrent execution and thread reuse are used to efficiently execute spark Sent by Task. After the thread pool is created , wait for Driver Send a task to CoarseGrainedExecutorBackend, Not directly Executor, because Executo Not a message loop body .

    Executor How it works ???

    Driver Send it Task when , Actually, it was sent to CoarseGrainedExecutorBackend This RpcEndpoint, Instead of sending it directly to Executor( because Executor Not the message loop body , Never receive messages directly from a remote location ).

    Driver towards CoarseGrainedExecutorBackend send out LaunchTask, Turn to the threads in the thread pool to execute . First judge Executor Is it empty , If it is empty, exit directly , If it is not empty, deserialize the task call Executor Of launchTask, Submit to Executor perform .

    launchTask Received Task After executing the command , First of all, will Task Encapsulated into TaskRunner Inside , Then put runningTasks,runningTasks It's a data structure .

    Executor Of run Method final call Task.run Method realization .Executor Of run Method call method Task Of run Method source code :

  8. ............................
    
    var threwException = true
    val (value, accumUpdates) = try {
      val res = task.run(
        taskAttemptId = taskId,
        attemptNumber = attemptNumber,
        metricsSystem = env.metricsSystem)
      threwException = false
      res
    }
    ...........................................

    class Task Of run Method source code :

  9. final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem)
    : (T, AccumulatorUpdates) = {
      context = new TaskContextImpl(
        stageId,partitionId,taskAttemptId,attemptNumber,taskMemoryManager,metricsSystem,
        internalAccumulators,runningLocally = false)
      TaskContext.setTaskContext(context)
      context.taskMetrics.setHostname(Utils.localHostName())
    context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
      taskThread = Thread.currentThread()
      if (_killed) {
        kill(interruptThread = false)
      }
      try {// Mainly called runTask Method 
        (runTask(context), context.collectAccumulators())
      } finally {
        context.markTaskCompleted()
    .................................

    Task Abstract class ,Task Of run Methods are concrete instance methods , and Task Of runTask Methods are abstract methods , and Task The subclasses of are ShuffleMapTask and ResultTask Two , It depends on the actual situation of the task to implement which subclass runTask Method .ShuffleMapTask And ResultTask Of runTask The difference between methods is task Whether to carry out shuffle operation ( stay runTask Whether to execute shuffle Write operations for ).

原网站

版权声明
本文为[ZH519080]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221523254447.html