当前位置:网站首页>Processing source code of spark executor execution results
Processing source code of spark executor execution results
2022-06-22 16:57:00 【ZH519080】
from 1.6 after ,Driver Of BlockManagerMaster And BlockManager Communication between is no longer used AkkaUtil It is RpcEndpoint
Spark There are many executors in the cluster that execute , It takes a lot of Executor,CoarseGrainedExecutorBackend yes Executor Process ,Executor need CoarseGrainedExecutorBackend Maintain and manage .CoarseGrainedExecutorBackend When starting, you need to send RegisterExecutor towards Driver register , The registration content is RegisterExecutor.RegisterExecutor It's a case class, Source code is as follows :
// towards Driver Registered Executor Information
case class RegisterExecutor(executorId: String,executorRef: RpcEndpointRef,
hostPort: String,cores: Int,logUrls: Map[String, String])
extends CoarseGrainedClusterMessageRegisterExecutor The registration information of is sent by the class CoarseGrainedExecutorBackend Of onStart Method .
override def onStart() {
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
driver = Some(ref)
ref.ask[RegisterExecutorResponse](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
}
}(ThreadUtils.sameThread)
}CoarseGrainedExecutorBackend Start up to Driver send out RegisterExecutor Message to register ;Driver received RegisterExecutor news , stay Executor After successful registration, a message is returned RegisterExecutor to CoarseGrainedExecutorBackend. Registered here Executor And really working Executor It doesn't matter , Actually, registration is RegisterExecutorBackend, Can be RegisterExecutor Understood as a RegisterExecutorBackend.
Be careful :1、CoarseGrainedExecutorBackend yes Executor Name of the process in which it runs ,CoarseGrainedExecutorBackend It will not complete the calculation of the task itself .2、Executor Is the object that is processing the task ,Executor The internal is completed through thread pool Task Of calculation .3、CoarseGrainedExecutorBackend And Executor It's one-to-one .4、CoarseGrainedExecutorBackend Is a message communicator , Acceptable Driver Information 、 Send to Driver Information, etc , Inherit and ThreadSafeRPCEndpoint.
CoarseGrainedExecutorBackend hold RegisterExecutor Message sent to Driver, among Driver stay SparkDeploySchedulerBackend Realization ( stay Spark-2.0 after SparkDeploySchedulerBackend Renamed StandaloneSchedulerBackend).SparkDeploySchedulerBackend Inherited from CoarseGrainedSchedulerBackend,start Start on startup AppClient(Spark-2.0 after ,AppClient Renamed StandaloneAppClient),SparkDeploySchedulerBackend Of start Method source code :
override def start() { super.start() // call CoarseGrainedSchedulerBackend Of start Method launcherBackend.connect() // The endpoint for executors to talk to us val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName, RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt), CoarseGrainedSchedulerBackend.ENDPOINT_NAME) val args = Seq( "--driver-url", driverUrl, "--executor-id", "{
{EXECUTOR_ID}}", "--hostname", "{
{HOSTNAME}}", "--cores", "{
{CORES}}", "--app-id", "{
{APP_ID}}", "--worker-url", "{
{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) val testingClassPath = if (sys.props.contains("spark.testing")) { sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq } else {Nil} // Start by sending the registration information Executors
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start() // start-up Driver
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}stay Driver There are two important things in the process Endpoint:
ClientEndpoint: Responsible for providing Master Register the current program , yes AppClient Internal member classes
DriverEndpoint: It is the drive for the whole program to run , Received RegisterExecutor The message is completed in Driver Registration on , yes CoarseGrainedSchedulerBackend Internal member classes .
Eexecutor Of RegisterExecutor The registration message is submitted to DriverEndPoint, adopt DriverEndPoint Write data to CoarseGrainedSchedulerBackend The data structure inside executorMapData, the CoarseGrainedSchedulerBackend Get all the... Allocated by the current program ExecutorBackend process , And in each ExecutorBackend In the instance , adopt Executor Object is responsible for the execution of specific tasks .Executor And CoarseGrainedSchedulerBackend Between RegisterExecutor Messages are received and sent through receiveAndReply Method ,receiveAndReply Method is very important .receiveAndReply Methods include RegisterExecutor Registration process :
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
if (executorDataMap.contains(executorId)) { //RegisterExecutor Registration information
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
// address Not empty . be executorRef.address As executorAddress
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else { // If it is empty , Use sender Of Address As executorAddress
context.senderAddress
}
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
cores, cores, logUrls)
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1}}
context.reply(RegisteredExecutor(executorAddress.host))
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
case StopDriver =>
context.reply(true)
stop()
case StopExecutors =>
logInfo("Asking each executor to shut down")
for ((_, executorData) <- executorDataMap) {
executorData.executorEndpoint.send(StopExecutor)
}
context.reply(true)
case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
context.reply(true)
case RetrieveSparkProps =>
context.reply(sparkProperties)
}from CoarseGrainedSchedulerBackend Of receiveAndReply The method is known RegisterExecutor The execution of the message , namely Executor The registration process :
Judge executorDataMap Does it include executorId, If the message of sending registration failure has been included RegisterExecutorFailed, Because there is already a repetition executorId Of Executor Running .
Conduct Executor Registration of , obtain executorAddress
Definition 3 Data structures :addressToExecutorId(DriverEndPoint Data structure of , contain RPC Address hostname and port are the same as ExecutorId Correspondence of )、totalCoreCount( The total number of cores in the cluster )、totalRegisteredExecutors( Currently registered Executors total . Both are CoarseGrainedSchedulerBackend data structure )
Create a ExecutorData, extract executorRef、executorRef.address、hostname、cores Etc .
Through code blocks CoarseGrainedSchedulerBackend.this.synchronized: Multiple in the cluster Executor towards Driver register , Prevent write conflicts , Designed as a synchronized code block .
executor.send(RegisterExecutor) Send a message RegisterExecutor to sender,sender yes CoarseGrainedExecutorBackend, and CoarseGrainedExecutorBackend received RegisterExecutor After the news , Created Executor. and Executor Is responsible for the real Task Calculated .
override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor(hostname) => logInfo("Successfully registered with driver") executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) System.exit(1) case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) } ......................Created threadPool Multi thread concurrent execution and thread reuse are used to efficiently execute spark Sent by Task. After the thread pool is created , wait for Driver Send a task to CoarseGrainedExecutorBackend, Not directly Executor, because Executo Not a message loop body .
Executor How it works ???
Driver Send it Task when , Actually, it was sent to CoarseGrainedExecutorBackend This RpcEndpoint, Instead of sending it directly to Executor( because Executor Not the message loop body , Never receive messages directly from a remote location ).
Driver towards CoarseGrainedExecutorBackend send out LaunchTask, Turn to the threads in the thread pool to execute . First judge Executor Is it empty , If it is empty, exit directly , If it is not empty, deserialize the task call Executor Of launchTask, Submit to Executor perform .
launchTask Received Task After executing the command , First of all, will Task Encapsulated into TaskRunner Inside , Then put runningTasks,runningTasks It's a data structure .
Executor Of run Method final call Task.run Method realization .Executor Of run Method call method Task Of run Method source code :
............................ var threwException = true val (value, accumUpdates) = try { val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } ...........................................class Task Of run Method source code :
final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem) : (T, AccumulatorUpdates) = { context = new TaskContextImpl( stageId,partitionId,taskAttemptId,attemptNumber,taskMemoryManager,metricsSystem, internalAccumulators,runningLocally = false) TaskContext.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } try {// Mainly called runTask Method (runTask(context), context.collectAccumulators()) } finally { context.markTaskCompleted() .................................Task Abstract class ,Task Of run Methods are concrete instance methods , and Task Of runTask Methods are abstract methods , and Task The subclasses of are ShuffleMapTask and ResultTask Two , It depends on the actual situation of the task to implement which subclass runTask Method .ShuffleMapTask And ResultTask Of runTask The difference between methods is task Whether to carry out shuffle operation ( stay runTask Whether to execute shuffle Write operations for ).
边栏推荐
- Shell learning
- 联合主键引发的思考
- 为什么要买增额终身寿险?增额终身寿险安全可靠吗?
- Web technology sharing | [Gaode map] to realize customized track playback
- jsp学习之(一)---------jsp概述
- Summary of safari compatibility issues
- 面对默认导入失败的情况
- Task scheduling design of collection system
- Test for API
- What is the difference between "img" and "ALT" in the interview question
猜你喜欢

Linux system maintenance: mysql8.0.13 source code download and installation "fool" operation steps (Linux centos6.8) test available series

VHEDT业务发展框架
![[wechat applet custom bottom tabbar]](/img/04/2ea4ab3fd8571499190a9b3c9990b2.png)
[wechat applet custom bottom tabbar]

Partage de l'architecture du système de paiement du Groupe letv pour traiter 100 000 commandes simultanées élevées par seconde

jMeter使用案例

短视频源码开发,优质的短视频源码需要做好哪几点?

Learning about ABAP program tuning (IV) loop where key

CUMT study diary - quick notes of digital image processing examination

Short video source code development, high-quality short video source code need to do what?

web技术分享| 【高德地图】实现自定义的轨迹回放
随机推荐
新手必会的静态站点生成器——Gridsome
什么是RESTful,REST api设计时应该遵守什么样的规则?
Qt笔记-QMap自定义键(key)
使用IDM让百度云加速的方法
spark-shuffle的读数据源码分析
2022年中国重卡智能化升级专题研究
jsp学习之开发环境的配置
同花顺容易开户么?网上开户安全么?
spark关于数据倾斜问题
对ABAP程序调优的学习(四)LOOP WHERE KEY
为数字添加千分位符号(金额千分位)
【C语言深度解剖】关键字if&&else&&bool类型
交互电子白板有哪些特点?电子白板功能介绍
How to open an account in flush? Is it safe to open an account online?
STM32通过DMA进行ADC采集(HAL库)
面试知识点
[Alibaba cloud server - install MySQL version 5.6 and reinstall]
【C语言】深度剖析整型和浮点型在内存中的存储
洞见科技牵头的全球「首个」IEEE隐私计算「互联互通」国际标准正式启动
【C语言】库函数qsort的使用