当前位置:网站首页>Detailed explanation of Spark's support source code for Yan priority
Detailed explanation of Spark's support source code for Yan priority
2022-06-25 11:21:00 【pyiran】
Yarn The scheduler
Priority stay Yarn The use of
SparkOnYarn Support priority
Reference resources
Yarn The scheduler
stay Yarn in , Provides Capacity scheduler and Fair scheduler, They all support priority Of . Here we briefly introduce the concept , Not too much description .
Capacity Scheduler
Capacity scheuler The purpose of the design is to make Hadoop Upper applications Resources can be shared and run in the form of a multi tenant , This scheduler is generally used in a large public cluster , Allocate resources to specific user groups according to queues . We can simply configure the queue to cluster Restrictions on the use of resources or users in the queue ( Minimum guarantee and maximum limit, etc ), When a queue is empty ,Yarn You can temporarily use the remaining resources to share with other queues .
Fair Scheduler
Fair scheduler Just like its name , When he allocates resources , Is adhering to the principle of fairness ,applications The average resources allocated over a period of time tend to be equal . If one has only one application When running on a cluster , Resources are available for this one application Use . If there is another application When submitted to the cluster , Free resources will be allocated to newly submitted application On , In this way, each running application Will be allocated equal resources .
Priority stay Yarn The use of
Capacity Scheduler
Capacity scheduler Support for application priority Set up .Yarn Of priority It's an integer type , A larger number represents a higher priority , This function is only supported in FIFO( Default ) Under the strategy of .priority Can target cluster perhaps queue Level .
- cluster level: If your application Set up priority More than the cluster Maximum , That's according to the biggest cluster priority treat .
- queue level: The queue has a default priority value ,queue Under the applications If no specific priority Will be set to this default value . If application Changed queue, its priority The value does not change .
Fair Scheduler
Fair scheduler Support a running application Move to another one priority Different queue in , So this application The weight of resources will follow queue change . Be migrated application Our resources will be counted in the new queue On , If the required resources exceed the new queue The maximum limit of , The migration will fail .
SparkOnYarn Support priority
How to Spark app Set up priority
Just... Again SparkConf Just set it in the , follow Yarn about priority The definition of , The greater the numerical ,priority The higher the , Submitted at the same time job There will be a higher priority to obtain resources :
val sparkConf = new SparkConf()
.set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
.set(MAX_APP_ATTEMPTS, 42)
.set("spark.app.name", "foo-test-app")
.set(QUEUE_NAME, "staging-queue")
.set(APPLICATION_PRIORITY, 1)
Spark Source code
Spark There are already for Yarn Of priority Official support , Here is an example in Jira On closed Of SPARK-10879. This Jira It's a very old version ,diff For reference only , Used to make people understand Spark on Yarn How to set up priority Basic flow .
Actually need support priority It's simple , One is the need to be in submit When it comes to priority Parameter settings , The official place is SparkConf I'm going to set up ; The other is to be in createApplicationSubmissionContext When , call setPriority take priority The incoming to Yarn. Here's the code for the key places :
- /resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
private[spark] val APPLICATION_PRIORITY = ConfigBuilder("spark.yarn.priority")
.doc("Application priority for YARN to define pending applications ordering policy, those" +
" with higher value have a better opportunity to be activated. Currently, YARN only" +
" supports application priority when using FIFO ordering policy.")
.intConf
.createOptional
- /resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala For in the createApplicationSubmissionContext Function modification :
/** * Set up the context for submitting our ApplicationMaster. * This uses the YarnClientApplication not available in the Yarn alpha API. */
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val componentName = if (isClusterMode) {
config.YARN_DRIVER_RESOURCE_TYPES_PREFIX
} else {
config.YARN_AM_RESOURCE_TYPES_PREFIX
}
val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)
val amResources = yarnAMResources ++
getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
logDebug(s"AM resources: $amResources")
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
appContext.setQueue(sparkConf.get(QUEUE_NAME))
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
sparkConf.get(APPLICATION_TAGS).foreach {
tags =>
appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
}
sparkConf.get(MAX_APP_ATTEMPTS) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
"Cluster's default value will be used.")
}
sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach {
interval =>
appContext.setAttemptFailuresValidityInterval(interval)
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(amMemory + amMemoryOverhead)
capability.setVirtualCores(amCores)
if (amResources.nonEmpty) {
ResourceRequestHelper.setResourceRequests(amResources, capability)
}
logDebug(s"Created resource capability for AM request: $capability")
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
val amRequest = Records.newRecord(classOf[ResourceRequest])
amRequest.setResourceName(ResourceRequest.ANY)
amRequest.setPriority(Priority.newInstance(0))
amRequest.setCapability(capability)
amRequest.setNumContainers(1)
amRequest.setNodeLabelExpression(expr)
appContext.setAMContainerResourceRequest(amRequest)
case None =>
appContext.setResource(capability)
}
sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach {
includePattern =>
try {
val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
logAggregationContext.setRolledLogsIncludePattern(includePattern)
sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach {
excludePattern =>
logAggregationContext.setRolledLogsExcludePattern(excludePattern)
}
appContext.setLogAggregationContext(logAggregationContext)
} catch {
case NonFatal(e) =>
logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
"does not support it", e)
}
}
appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)
sparkConf.get(APPLICATION_PRIORITY).foreach {
appPriority =>
appContext.setPriority(Priority.newInstance(appPriority))
}
appContext
}
Reference resources
Yarn Capacity Scheduler: https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
Yarn Fair Scheduler: https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
Spark Jira: https://issues.apache.org/jira/browse/SPARK-10879
Hadoop Yarn API: https://hadoop.apache.org/docs/r3.2.1/api/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.html
边栏推荐
- Hangzhou / Beijing neitui Ali Dharma academy recruits academic interns in visual generation (talent plan)
- 金仓数据库 KingbaseES 插件ftutilx
- 龙书虎书鲸书啃不动?试试豆瓣评分9.5的猴书
- 基于Minifilter框架的双缓冲透明加解密驱动 课程论文+项目源码
- Kingbasees plug-in DBMS of Jincang database_ UTILITY
- GaussDB 集群维护案例集-sql执行慢
- Query method and interrupt method to realize USART communication
- Double tampon transparent cryptage et décryptage basé sur le cadre minifilter
- SystemVerilog (XIII) - enumerate data types
- Shen Lu, China Communications Institute: police open source Protocol - ofl v1.1 Introduction and Compliance Analysis
猜你喜欢
Netease's open source distributed storage system curve officially became the CNCF sandbox project
[observation] objectscale: redefining the next generation of object storage, reconstruction and innovation of Dell Technology
视频会议一体机的技术实践和发展趋势
Use of three-level linkage plug-ins selected by provinces and cities
牛客网:分糖果问题
数据库系列:MySQL索引优化总结(综合版)
SystemVerilog (XIII) - enumerate data types
Leetcode 1249. Remove invalid brackets (awesome, finally made)
ZABBIX distributed system monitoring
[file containing vulnerability-03] six ways to exploit file containing vulnerabilities
随机推荐
金仓数据库 KingbaseES 插件dbms_session
scrapy+scrapyd+gerapy 爬虫调度框架
Leetcode 1249. 移除无效的括号(牛逼,终于做出来了)
Double buffer transparent encryption and decryption driven course paper + project source code based on minifilter framework
Comparator (for arrays.sort)
Technical practice and development trend of video conference all in one machine
How gaussdb counts the response time of user SQL
A program reflecting the characteristics of C language program structure
杭州/北京内推 | 阿里达摩院招聘视觉生成方向学术实习生(人才计划)
try-catch-finally
Kingbasees plug-in DBMS of Jincang database_ UTILITY
An interesting logic SRC mining
金仓数据库 KingbaseES 插件ftutilx
GaussDB others内存比较高的场景
金仓数据库 KingbaseES 插件DBMS_UTILITY
What are the ways to simulate and burn programs? (including common tools and usage)
Kingbasees plug-in DBMS of Jincang database_ OUTPUT
数据库系列:MySQL索引优化总结(综合版)
GC
Redis6笔记02 配置文件,发布和订阅,新数据类型,Jedis操作