当前位置:网站首页>Detailed explanation of Spark's support source code for Yan priority
Detailed explanation of Spark's support source code for Yan priority
2022-06-25 11:21:00 【pyiran】
Yarn The scheduler
Priority stay Yarn The use of
SparkOnYarn Support priority
Reference resources
Yarn The scheduler
stay Yarn in , Provides Capacity scheduler and Fair scheduler, They all support priority Of . Here we briefly introduce the concept , Not too much description .
Capacity Scheduler
Capacity scheuler The purpose of the design is to make Hadoop Upper applications Resources can be shared and run in the form of a multi tenant , This scheduler is generally used in a large public cluster , Allocate resources to specific user groups according to queues . We can simply configure the queue to cluster Restrictions on the use of resources or users in the queue ( Minimum guarantee and maximum limit, etc ), When a queue is empty ,Yarn You can temporarily use the remaining resources to share with other queues .
Fair Scheduler
Fair scheduler Just like its name , When he allocates resources , Is adhering to the principle of fairness ,applications The average resources allocated over a period of time tend to be equal . If one has only one application When running on a cluster , Resources are available for this one application Use . If there is another application When submitted to the cluster , Free resources will be allocated to newly submitted application On , In this way, each running application Will be allocated equal resources .
Priority stay Yarn The use of
Capacity Scheduler
Capacity scheduler Support for application priority Set up .Yarn Of priority It's an integer type , A larger number represents a higher priority , This function is only supported in FIFO( Default ) Under the strategy of .priority Can target cluster perhaps queue Level .
- cluster level: If your application Set up priority More than the cluster Maximum , That's according to the biggest cluster priority treat .
- queue level: The queue has a default priority value ,queue Under the applications If no specific priority Will be set to this default value . If application Changed queue, its priority The value does not change .
Fair Scheduler
Fair scheduler Support a running application Move to another one priority Different queue in , So this application The weight of resources will follow queue change . Be migrated application Our resources will be counted in the new queue On , If the required resources exceed the new queue The maximum limit of , The migration will fail .
SparkOnYarn Support priority
How to Spark app Set up priority
Just... Again SparkConf Just set it in the , follow Yarn about priority The definition of , The greater the numerical ,priority The higher the , Submitted at the same time job There will be a higher priority to obtain resources :
val sparkConf = new SparkConf()
.set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
.set(MAX_APP_ATTEMPTS, 42)
.set("spark.app.name", "foo-test-app")
.set(QUEUE_NAME, "staging-queue")
.set(APPLICATION_PRIORITY, 1)
Spark Source code
Spark There are already for Yarn Of priority Official support , Here is an example in Jira On closed Of SPARK-10879. This Jira It's a very old version ,diff For reference only , Used to make people understand Spark on Yarn How to set up priority Basic flow .
Actually need support priority It's simple , One is the need to be in submit When it comes to priority Parameter settings , The official place is SparkConf I'm going to set up ; The other is to be in createApplicationSubmissionContext When , call setPriority take priority The incoming to Yarn. Here's the code for the key places :
- /resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
private[spark] val APPLICATION_PRIORITY = ConfigBuilder("spark.yarn.priority")
.doc("Application priority for YARN to define pending applications ordering policy, those" +
" with higher value have a better opportunity to be activated. Currently, YARN only" +
" supports application priority when using FIFO ordering policy.")
.intConf
.createOptional
- /resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala For in the createApplicationSubmissionContext Function modification :
/** * Set up the context for submitting our ApplicationMaster. * This uses the YarnClientApplication not available in the Yarn alpha API. */
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val componentName = if (isClusterMode) {
config.YARN_DRIVER_RESOURCE_TYPES_PREFIX
} else {
config.YARN_AM_RESOURCE_TYPES_PREFIX
}
val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)
val amResources = yarnAMResources ++
getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
logDebug(s"AM resources: $amResources")
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
appContext.setQueue(sparkConf.get(QUEUE_NAME))
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
sparkConf.get(APPLICATION_TAGS).foreach {
tags =>
appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
}
sparkConf.get(MAX_APP_ATTEMPTS) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
"Cluster's default value will be used.")
}
sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach {
interval =>
appContext.setAttemptFailuresValidityInterval(interval)
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(amMemory + amMemoryOverhead)
capability.setVirtualCores(amCores)
if (amResources.nonEmpty) {
ResourceRequestHelper.setResourceRequests(amResources, capability)
}
logDebug(s"Created resource capability for AM request: $capability")
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
val amRequest = Records.newRecord(classOf[ResourceRequest])
amRequest.setResourceName(ResourceRequest.ANY)
amRequest.setPriority(Priority.newInstance(0))
amRequest.setCapability(capability)
amRequest.setNumContainers(1)
amRequest.setNodeLabelExpression(expr)
appContext.setAMContainerResourceRequest(amRequest)
case None =>
appContext.setResource(capability)
}
sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach {
includePattern =>
try {
val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
logAggregationContext.setRolledLogsIncludePattern(includePattern)
sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach {
excludePattern =>
logAggregationContext.setRolledLogsExcludePattern(excludePattern)
}
appContext.setLogAggregationContext(logAggregationContext)
} catch {
case NonFatal(e) =>
logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
"does not support it", e)
}
}
appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)
sparkConf.get(APPLICATION_PRIORITY).foreach {
appPriority =>
appContext.setPriority(Priority.newInstance(appPriority))
}
appContext
}
Reference resources
Yarn Capacity Scheduler: https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
Yarn Fair Scheduler: https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
Spark Jira: https://issues.apache.org/jira/browse/SPARK-10879
Hadoop Yarn API: https://hadoop.apache.org/docs/r3.2.1/api/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.html
边栏推荐
- Leetcode 1249. Remove invalid brackets (awesome, finally made)
- Daily 3 questions (3) - check whether integers and their multiples exist
- GC
- Hangzhou / Beijing neitui Ali Dharma academy recruits academic interns in visual generation (talent plan)
- Kingbasees plug-in DBMS of Jincang database_ UTILITY
- Arrays. asList()
- 数据库系列:MySQL索引优化总结(综合版)
- 16 enterprise architecture strategies
- scrapy+scrapyd+gerapy 爬虫调度框架
- Bayes
猜你喜欢

Getting started with Apache Shenyu

SystemVerilog (XIII) - enumerate data types

Task03 probability theory

基于SSH的高校实验室物品管理信息系统的设计与实现 论文文档+项目源码及数据库文件

Software testing to avoid being dismissed during the probation period

C disk uses 100% cleaning method

Compilation of learning from Wang Shuang (1)

SQL注入漏洞(绕过篇)

scrapy+scrapyd+gerapy 爬虫调度框架

Coscon'22 lecturer solicitation order
随机推荐
SystemVerilog (XIII) - enumerate data types
今天16:00 | 中科院计算所研究员孙晓明老师带大家走进量子的世界
A random number generator
ARM64汇编的函数有那些需要注意?
MySQL synchronous data configuration and shell script implementation
反应c语言程序结构特点的程序
牛客网:主持人调度
10.1. Oracle constraint deferred, not deferred, initially deferred and initially deferred
Compilation of learning from Wang Shuang (1)
基于C语言的图书信息管理系统 课程论文+代码及可执行exe文件
金仓数据库 KingbaseES 插件identity_pwdexp
中国信通院沈滢:字体开源协议——OFL V1.1介绍及合规要点分析
视频会议一体机的技术实践和发展趋势
VW VH adaptation of mobile terminal
Daily 3 questions (3) - check whether integers and their multiples exist
Netease's open source distributed storage system curve officially became the CNCF sandbox project
relu与sigmod的比较
Double buffer transparent encryption and decryption driven course paper + project source code based on minifilter framework
Gaussdb cluster maintenance case set - slow SQL execution
Jincang KFS data centralized scenario (many to one) deployment