当前位置:网站首页>Detailed explanation of Spark's support source code for Yan priority

Detailed explanation of Spark's support source code for Yan priority

2022-06-25 11:21:00 pyiran

Yarn The scheduler
Priority stay Yarn The use of
SparkOnYarn Support priority
Reference resources

Yarn The scheduler

stay Yarn in , Provides Capacity scheduler and Fair scheduler, They all support priority Of . Here we briefly introduce the concept , Not too much description .

Capacity Scheduler

Capacity scheuler The purpose of the design is to make Hadoop Upper applications Resources can be shared and run in the form of a multi tenant , This scheduler is generally used in a large public cluster , Allocate resources to specific user groups according to queues . We can simply configure the queue to cluster Restrictions on the use of resources or users in the queue ( Minimum guarantee and maximum limit, etc ), When a queue is empty ,Yarn You can temporarily use the remaining resources to share with other queues .

Fair Scheduler

Fair scheduler Just like its name , When he allocates resources , Is adhering to the principle of fairness ,applications The average resources allocated over a period of time tend to be equal . If one has only one application When running on a cluster , Resources are available for this one application Use . If there is another application When submitted to the cluster , Free resources will be allocated to newly submitted application On , In this way, each running application Will be allocated equal resources .

Priority stay Yarn The use of

Capacity Scheduler

Capacity scheduler Support for application priority Set up .Yarn Of priority It's an integer type , A larger number represents a higher priority , This function is only supported in FIFO( Default ) Under the strategy of .priority Can target cluster perhaps queue Level .

  1. cluster level: If your application Set up priority More than the cluster Maximum , That's according to the biggest cluster priority treat .
  2. queue level: The queue has a default priority value ,queue Under the applications If no specific priority Will be set to this default value . If application Changed queue, its priority The value does not change .

Fair Scheduler

Fair scheduler Support a running application Move to another one priority Different queue in , So this application The weight of resources will follow queue change . Be migrated application Our resources will be counted in the new queue On , If the required resources exceed the new queue The maximum limit of , The migration will fail .

SparkOnYarn Support priority

How to Spark app Set up priority

Just... Again SparkConf Just set it in the , follow Yarn about priority The definition of , The greater the numerical ,priority The higher the , Submitted at the same time job There will be a higher priority to obtain resources :

val sparkConf = new SparkConf()
      .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
      .set(MAX_APP_ATTEMPTS, 42)
      .set("spark.app.name", "foo-test-app")
      .set(QUEUE_NAME, "staging-queue")
      .set(APPLICATION_PRIORITY, 1)

Spark Source code

Spark There are already for Yarn Of priority Official support , Here is an example in Jira On closed Of SPARK-10879. This Jira It's a very old version ,diff For reference only , Used to make people understand Spark on Yarn How to set up priority Basic flow .
Actually need support priority It's simple , One is the need to be in submit When it comes to priority Parameter settings , The official place is SparkConf I'm going to set up ; The other is to be in createApplicationSubmissionContext When , call setPriority take priority The incoming to Yarn. Here's the code for the key places :

  1. /resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
  private[spark] val APPLICATION_PRIORITY = ConfigBuilder("spark.yarn.priority")
    .doc("Application priority for YARN to define pending applications ordering policy, those" +
      " with higher value have a better opportunity to be activated. Currently, YARN only" +
      " supports application priority when using FIFO ordering policy.")
    .intConf
    .createOptional
  1. /resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala For in the createApplicationSubmissionContext Function modification :
/** * Set up the context for submitting our ApplicationMaster. * This uses the YarnClientApplication not available in the Yarn alpha API. */
  def createApplicationSubmissionContext(
      newApp: YarnClientApplication,
      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
    

    val componentName = if (isClusterMode) {
    
      config.YARN_DRIVER_RESOURCE_TYPES_PREFIX
    } else {
    
      config.YARN_AM_RESOURCE_TYPES_PREFIX
    }
    val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)
    val amResources = yarnAMResources ++
      getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
    logDebug(s"AM resources: $amResources")
    val appContext = newApp.getApplicationSubmissionContext
    appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
    appContext.setQueue(sparkConf.get(QUEUE_NAME))
    appContext.setAMContainerSpec(containerContext)
    appContext.setApplicationType("SPARK")

    sparkConf.get(APPLICATION_TAGS).foreach {
     tags =>
      appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
    }
    sparkConf.get(MAX_APP_ATTEMPTS) match {
    
      case Some(v) => appContext.setMaxAppAttempts(v)
      case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
          "Cluster's default value will be used.")
    }

    sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach {
     interval =>
      appContext.setAttemptFailuresValidityInterval(interval)
    }

    val capability = Records.newRecord(classOf[Resource])
    capability.setMemory(amMemory + amMemoryOverhead)
    capability.setVirtualCores(amCores)
    if (amResources.nonEmpty) {
    
      ResourceRequestHelper.setResourceRequests(amResources, capability)
    }
    logDebug(s"Created resource capability for AM request: $capability")

    sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
    
      case Some(expr) =>
        val amRequest = Records.newRecord(classOf[ResourceRequest])
        amRequest.setResourceName(ResourceRequest.ANY)
        amRequest.setPriority(Priority.newInstance(0))
        amRequest.setCapability(capability)
        amRequest.setNumContainers(1)
        amRequest.setNodeLabelExpression(expr)
        appContext.setAMContainerResourceRequest(amRequest)
      case None =>
        appContext.setResource(capability)
    }

    sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach {
     includePattern =>
      try {
    
        val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
        logAggregationContext.setRolledLogsIncludePattern(includePattern)
        sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach {
     excludePattern =>
          logAggregationContext.setRolledLogsExcludePattern(excludePattern)
        }
        appContext.setLogAggregationContext(logAggregationContext)
      } catch {
    
        case NonFatal(e) =>
          logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
            "does not support it", e)
      }
    }
    appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)

    sparkConf.get(APPLICATION_PRIORITY).foreach {
     appPriority =>
      appContext.setPriority(Priority.newInstance(appPriority))
    }
    appContext
  }

Reference resources

Yarn Capacity Scheduler: https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
Yarn Fair Scheduler: https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
Spark Jira: https://issues.apache.org/jira/browse/SPARK-10879
Hadoop Yarn API: https://hadoop.apache.org/docs/r3.2.1/api/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.html

原网站

版权声明
本文为[pyiran]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202200540010080.html