当前位置:网站首页>Spark history server performance improvement (I) -- Application List
Spark history server performance improvement (I) -- Application List
2022-06-25 11:21:00 【pyiran】
Spark History Server The home page of
Spark History Server(SHS) The homepage shows a certain period of time (spark.history.fs.cleaner.maxAge, default 7d) All of application list , Contains applicationId, name, attemptId, start time, end time, duration, user, eventlog Download link .
SHS How to generate app list
SHS Is based on event log To generate application list, This can not avoid the need to SHS To read and parse log. From the code we can see , Need analysis :SparkListenerApplicationStart,SparkListenerApplicationEnd,SparkListenerEnvironmentUpdate,sparkVersion.
When application still running Under the circumstances ,SHS There is no need to read SparkListenerApplicationEnd You can get the required list information , So you only need to parse a small number of event log The content can be completed .
But for the ending application, You need to read end event To get the end time , And we cannot guarantee event log The last piece of data is end event, So in the earliest days SHS It is necessary to traverse all event To generate list information . The generated list information will be cached in levelDB in , Corresponding to listing.ldb.
private[history] class AppListingListener(
reader: EventLogFileReader,
clock: Clock,
haltEnabled: Boolean) extends SparkListener {
private val app = new MutableApplicationInfo()
private val attempt = new MutableAttemptInfo(reader.rootPath.getName(),
reader.fileSizeForLastIndex, reader.lastIndex)
private var gotEnvUpdate = false
private var halted = false
override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
app.id = event.appId.orNull
app.name = event.appName
attempt.attemptId = event.appAttemptId
attempt.startTime = new Date(event.time)
attempt.lastUpdated = new Date(clock.getTimeMillis())
attempt.sparkUser = event.sparkUser
checkProgress()
}
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
attempt.endTime = new Date(event.time)
attempt.lastUpdated = new Date(reader.modificationTime)
attempt.duration = event.time - attempt.startTime.getTime()
attempt.completed = true
}
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
// Only parse the first env update, since any future changes don't have any effect on
// the ACLs set for the UI.
if (!gotEnvUpdate) {
def emptyStringToNone(strOption: Option[String]): Option[String] = strOption match {
case Some("") => None
case _ => strOption
}
val allProperties = event.environmentDetails("Spark Properties").toMap
attempt.viewAcls = emptyStringToNone(allProperties.get(UI_VIEW_ACLS.key))
attempt.adminAcls = emptyStringToNone(allProperties.get(ADMIN_ACLS.key))
attempt.viewAclsGroups = emptyStringToNone(allProperties.get(UI_VIEW_ACLS_GROUPS.key))
attempt.adminAclsGroups = emptyStringToNone(allProperties.get(ADMIN_ACLS_GROUPS.key))
gotEnvUpdate = true
checkProgress()
}
}
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(sparkVersion) =>
attempt.appSparkVersion = sparkVersion
case _ =>
}
def applicationInfo: Option[ApplicationInfoWrapper] = {
if (app.id != null) {
Some(app.toView())
} else {
None
}
}
And for a lot of long running Of job Come on , for example streaming job, event log It will be very big in the end , In order to read a small amount of list information, you need to traverse all event log It is obviously inefficient , therefore SHS The function of fast parsing is introduced . We said earlier ,end event Not necessarily the last line , But it must be event log The end part of appears , So by opening “spark.history.fs.inProgressOptimization.enabled” And set up “spark.history.fs.endEventReparseChunkSize”, We can enable the quick parsing function . The principle of this function is , When application At the end , In order to obtain end evnet, We will start from the last “endEventReparseChunkSize” Size starts looking end event.
val lookForEndEvent = shouldHalt && (appCompleted || !fastInProgressParsing)
if (lookForEndEvent && listener.applicationInfo.isDefined) {
val lastFile = logFiles.last
Utils.tryWithResource(EventLogFileReader.openEventLog(lastFile.getPath, fs)) { in =>
val target = lastFile.getLen - reparseChunkSize
if (target > 0) {
logInfo(s"Looking for end event; skipping $target bytes from $logPath...")
var skipped = 0L
while (skipped < target) {
skipped += in.skip(target - skipped)
}
}
val source = Source.fromInputStream(in).getLines()
// Because skipping may leave the stream in the middle of a line, read the next line
// before replaying.
if (target > 0) {
source.next()
}
bus.replay(source, lastFile.getPath.toString, !appCompleted, eventsFilter)
}
}
logInfo(s"Finished parsing $logPath")
SHS Improve performance
To further improve the generation application list The speed of , Take advantage of HDFS Of extend attribute function .
SPARK-23607
Spark Driver
Driver Part of what needs to be done , Just writing event log At the same time , For the sake of log File creation extended attribute. for example “user.startTime”. This part is in EventLoggingListener.scala You can simply add .
SHS
SHS What needs to be done , Is generating list When , Go get it event log Of extended attributes, these attributes Is stored in the namenode In memory , So the reading speed is very fast . The main changes focus on FsHistoryProvider.scala in .
边栏推荐
- 记一次有趣的逻辑SRC挖掘
- Spannable 和 Editable、SpannableString 和 SpannableString
- Is it safe to open a securities account in changtou school by mobile phone?
- 中国信通院沈滢:字体开源协议——OFL V1.1介绍及合规要点分析
- CMU puts forward a new NLP paradigm - reconstructing pre training, and achieving 134 high scores in college entrance examination English
- GaussDB 集群维护案例集-sql执行慢
- Introduction to JVM principle
- 金太阳教育美股上市:市值3.6亿美元 成小盘中概股
- Simple use of SVN
- 手机上股票开户安全吗?找谁可以开户啊?
猜你喜欢
牛客网:分糖果问题
Android之Kotlin语法详解与使用
Démarrer avec Apache shenyu
How to start the phpstudy server
开源社邀请您参加OpenSSF开源安全线上研讨会
Crawler scheduling framework of scratch+scratch+grammar
[file inclusion vulnerability-04] classic interview question: how to getshell when a website is known to have only local file inclusion vulnerability?
CMU puts forward a new NLP paradigm - reconstructing pre training, and achieving 134 high scores in college entrance examination English
CMU提出NLP新范式—重构预训练,高考英语交出134高分
杭州/北京内推 | 阿里达摩院招聘视觉生成方向学术实习生(人才计划)
随机推荐
[maintain cluster case set] gaussdb query user space usage
2022 PMP project management examination agile knowledge points (2)
Introduction to JVM principle
Redis6 note02 configuration file, publish and subscribe, new data type, jedis operation
牛客网:分糖果问题
基于SSH的高校实验室物品管理信息系统的设计与实现 论文文档+项目源码及数据库文件
Kingbasees plug-in ftutilx of Jincang database
Handling of NPM I installation problems
CMU puts forward a new NLP paradigm - reconstructing pre training, and achieving 134 high scores in college entrance examination English
中國信通院沈瀅:字體開源協議——OFL V1.1介紹及合規要點分析
relu与sigmod的比较
What are the functions of arm64 assembly that need attention?
How to start the phpstudy server
Database Series: MySQL index optimization summary (comprehensive version)
Arrays. asList()
SystemVerilog(十三)-枚举数据类型
Comparison between relu and SIGMOD
Spannable 和 Editable、SpannableString 和 SpannableString
ZABBIX distributed system monitoring
查询法,中断法实现USART通信