当前位置:网站首页>Processing source code of spark executor execution results
Processing source code of spark executor execution results
2022-06-22 16:57:00 【ZH519080】
from 1.6 after ,Driver Of BlockManagerMaster And BlockManager Communication between is no longer used AkkaUtil It is RpcEndpoint
Spark There are many executors in the cluster that execute , It takes a lot of Executor,CoarseGrainedExecutorBackend yes Executor Process ,Executor need CoarseGrainedExecutorBackend Maintain and manage .CoarseGrainedExecutorBackend When starting, you need to send RegisterExecutor towards Driver register , The registration content is RegisterExecutor.RegisterExecutor It's a case class, Source code is as follows :
// towards Driver Registered Executor Information
case class RegisterExecutor(executorId: String,executorRef: RpcEndpointRef,
hostPort: String,cores: Int,logUrls: Map[String, String])
extends CoarseGrainedClusterMessageRegisterExecutor The registration information of is sent by the class CoarseGrainedExecutorBackend Of onStart Method .
override def onStart() {
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
driver = Some(ref)
ref.ask[RegisterExecutorResponse](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
}
}(ThreadUtils.sameThread)
}CoarseGrainedExecutorBackend Start up to Driver send out RegisterExecutor Message to register ;Driver received RegisterExecutor news , stay Executor After successful registration, a message is returned RegisterExecutor to CoarseGrainedExecutorBackend. Registered here Executor And really working Executor It doesn't matter , Actually, registration is RegisterExecutorBackend, Can be RegisterExecutor Understood as a RegisterExecutorBackend.
Be careful :1、CoarseGrainedExecutorBackend yes Executor Name of the process in which it runs ,CoarseGrainedExecutorBackend It will not complete the calculation of the task itself .2、Executor Is the object that is processing the task ,Executor The internal is completed through thread pool Task Of calculation .3、CoarseGrainedExecutorBackend And Executor It's one-to-one .4、CoarseGrainedExecutorBackend Is a message communicator , Acceptable Driver Information 、 Send to Driver Information, etc , Inherit and ThreadSafeRPCEndpoint.
CoarseGrainedExecutorBackend hold RegisterExecutor Message sent to Driver, among Driver stay SparkDeploySchedulerBackend Realization ( stay Spark-2.0 after SparkDeploySchedulerBackend Renamed StandaloneSchedulerBackend).SparkDeploySchedulerBackend Inherited from CoarseGrainedSchedulerBackend,start Start on startup AppClient(Spark-2.0 after ,AppClient Renamed StandaloneAppClient),SparkDeploySchedulerBackend Of start Method source code :
override def start() { super.start() // call CoarseGrainedSchedulerBackend Of start Method launcherBackend.connect() // The endpoint for executors to talk to us val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName, RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt), CoarseGrainedSchedulerBackend.ENDPOINT_NAME) val args = Seq( "--driver-url", driverUrl, "--executor-id", "{
{EXECUTOR_ID}}", "--hostname", "{
{HOSTNAME}}", "--cores", "{
{CORES}}", "--app-id", "{
{APP_ID}}", "--worker-url", "{
{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) val testingClassPath = if (sys.props.contains("spark.testing")) { sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq } else {Nil} // Start by sending the registration information Executors
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start() // start-up Driver
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}stay Driver There are two important things in the process Endpoint:
ClientEndpoint: Responsible for providing Master Register the current program , yes AppClient Internal member classes
DriverEndpoint: It is the drive for the whole program to run , Received RegisterExecutor The message is completed in Driver Registration on , yes CoarseGrainedSchedulerBackend Internal member classes .
Eexecutor Of RegisterExecutor The registration message is submitted to DriverEndPoint, adopt DriverEndPoint Write data to CoarseGrainedSchedulerBackend The data structure inside executorMapData, the CoarseGrainedSchedulerBackend Get all the... Allocated by the current program ExecutorBackend process , And in each ExecutorBackend In the instance , adopt Executor Object is responsible for the execution of specific tasks .Executor And CoarseGrainedSchedulerBackend Between RegisterExecutor Messages are received and sent through receiveAndReply Method ,receiveAndReply Method is very important .receiveAndReply Methods include RegisterExecutor Registration process :
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
if (executorDataMap.contains(executorId)) { //RegisterExecutor Registration information
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
// address Not empty . be executorRef.address As executorAddress
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else { // If it is empty , Use sender Of Address As executorAddress
context.senderAddress
}
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
cores, cores, logUrls)
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1}}
context.reply(RegisteredExecutor(executorAddress.host))
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
case StopDriver =>
context.reply(true)
stop()
case StopExecutors =>
logInfo("Asking each executor to shut down")
for ((_, executorData) <- executorDataMap) {
executorData.executorEndpoint.send(StopExecutor)
}
context.reply(true)
case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
context.reply(true)
case RetrieveSparkProps =>
context.reply(sparkProperties)
}from CoarseGrainedSchedulerBackend Of receiveAndReply The method is known RegisterExecutor The execution of the message , namely Executor The registration process :
Judge executorDataMap Does it include executorId, If the message of sending registration failure has been included RegisterExecutorFailed, Because there is already a repetition executorId Of Executor Running .
Conduct Executor Registration of , obtain executorAddress
Definition 3 Data structures :addressToExecutorId(DriverEndPoint Data structure of , contain RPC Address hostname and port are the same as ExecutorId Correspondence of )、totalCoreCount( The total number of cores in the cluster )、totalRegisteredExecutors( Currently registered Executors total . Both are CoarseGrainedSchedulerBackend data structure )
Create a ExecutorData, extract executorRef、executorRef.address、hostname、cores Etc .
Through code blocks CoarseGrainedSchedulerBackend.this.synchronized: Multiple in the cluster Executor towards Driver register , Prevent write conflicts , Designed as a synchronized code block .
executor.send(RegisterExecutor) Send a message RegisterExecutor to sender,sender yes CoarseGrainedExecutorBackend, and CoarseGrainedExecutorBackend received RegisterExecutor After the news , Created Executor. and Executor Is responsible for the real Task Calculated .
override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor(hostname) => logInfo("Successfully registered with driver") executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) System.exit(1) case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) } ......................Created threadPool Multi thread concurrent execution and thread reuse are used to efficiently execute spark Sent by Task. After the thread pool is created , wait for Driver Send a task to CoarseGrainedExecutorBackend, Not directly Executor, because Executo Not a message loop body .
Executor How it works ???
Driver Send it Task when , Actually, it was sent to CoarseGrainedExecutorBackend This RpcEndpoint, Instead of sending it directly to Executor( because Executor Not the message loop body , Never receive messages directly from a remote location ).
Driver towards CoarseGrainedExecutorBackend send out LaunchTask, Turn to the threads in the thread pool to execute . First judge Executor Is it empty , If it is empty, exit directly , If it is not empty, deserialize the task call Executor Of launchTask, Submit to Executor perform .
launchTask Received Task After executing the command , First of all, will Task Encapsulated into TaskRunner Inside , Then put runningTasks,runningTasks It's a data structure .
Executor Of run Method final call Task.run Method realization .Executor Of run Method call method Task Of run Method source code :
............................ var threwException = true val (value, accumUpdates) = try { val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } ...........................................class Task Of run Method source code :
final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem) : (T, AccumulatorUpdates) = { context = new TaskContextImpl( stageId,partitionId,taskAttemptId,attemptNumber,taskMemoryManager,metricsSystem, internalAccumulators,runningLocally = false) TaskContext.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } try {// Mainly called runTask Method (runTask(context), context.collectAccumulators()) } finally { context.markTaskCompleted() .................................Task Abstract class ,Task Of run Methods are concrete instance methods , and Task Of runTask Methods are abstract methods , and Task The subclasses of are ShuffleMapTask and ResultTask Two , It depends on the actual situation of the task to implement which subclass runTask Method .ShuffleMapTask And ResultTask Of runTask The difference between methods is task Whether to carry out shuffle operation ( stay runTask Whether to execute shuffle Write operations for ).
边栏推荐
猜你喜欢

Summary of safari compatibility issues

Learning about ABAP program tuning (IV) loop where key

In the era of video explosion, who is supporting the high-speed operation of video ecological network?

系统吞吐量、TPS(QPS)、用户并发量、性能测试概念和公式

Parts beyond the text are indicated by ellipsis

高可用性的ResourceManager

The world's "first" IEEE privacy computing "connectivity" international standard led by insight technology was officially launched
![[wechat applet to obtain the height of custom tabbar] is absolutely available!!!](/img/ed/7ff70178f03b50cb7bec349c1be5e0.png)
[wechat applet to obtain the height of custom tabbar] is absolutely available!!!
![[Alibaba cloud server - install MySQL version 5.6 and reinstall]](/img/5a/50b1de5f58235f6d11f6ad1eecc455.png)
[Alibaba cloud server - install MySQL version 5.6 and reinstall]
![[C language] deeply analyze the relationship between pointer and array](/img/f3/432eeee17034033361e05dde67aac3.jpg)
[C language] deeply analyze the relationship between pointer and array
随机推荐
MYSQL 存储过程异常处理 报错 错误代码: 1337
短视频源码开发,优质的短视频源码需要做好哪几点?
linux系统维护篇:mysql8.0.13源码下载及安装之“傻瓜式”操作步骤(linux-centos6.8)亲测可用系列
What is the difference between "img" and "ALT" in the interview question
web技术分享| 【高德地图】实现自定义的轨迹回放
系统吞吐量、TPS(QPS)、用户并发量、性能测试概念和公式
variable
【心理学】情感心理学-当代思想和传统思想的碰撞(本篇文章将不定期持续更新)
Redis实现延迟队列的正确姿势
Bidirectional data binding V-model and v-decorator
Iterators and generators
Oracle database and table
【微信小程序自定义底部tabbar】
团队管理|如何提高技术 Leader 的思考技巧?
In the era of video explosion, who is supporting the high-speed operation of video ecological network?
Web technology sharing | [Gaode map] to realize customized track playback
2022年中国重卡智能化升级专题研究
ERROR 1364 (HY000): Field ssl_cipher doesnt have a default value
mysql5.7.27安装之windows8.1 64
每秒處理10萬高並發訂單的樂視集團支付系統架構分享