当前位置:网站首页>spark source code - task submission process - 5-CoarseGrainedExecutorBackend
spark source code - task submission process - 5-CoarseGrainedExecutorBackend
2022-08-05 06:11:00 【zdaiqing】
CoarseGrainedExecutorBackend
1.概述
在4-spark源码-任务提交流程之container中启动executor中分析到,AM从RM获取到资源后,Polling resourcescontainers,由AM向NM申请,In each resourcecontainer中由/bin/java命令启动一个org.apache.spark.executor.CoarseGrainedExecutorBackend进程;
下面就org.apache.spark.executor.CoarseGrainedExecutorBackendAnalysis performed in the process;
2.入口
通过/bin/java方式创建CoarseGrainedExecutorBackend进程后,会以CoarseGrainedExecutorBackend进程的mainMethods for entrance to perform back;
main方法中,对参数进行解析,Parameters of the lack of turn offJVM运行,Parameters are complete callrunMethods to perform back;
CoarseGrainedExecutorBackend类继承ThreadSafeRpcEndpoint特质,ThreadSafeRpcEndpoint特质继承RpcEndpoint特质;
private[spark] object CoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
//参数解析
var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${
tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
}
}
if (hostname == null) {
hostname = Utils.localHostName()
log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself")
}
//参数缺少,终止JVM
if (driverUrl == null || executorId == null || cores <= 0 || appId == null) {
printUsageAndExit()
}
//调用run方法
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
}
3.run
组装参数、构造env、注册CoarseGrainedExecutorBackend实例、Blocking threads close to, and so on the dispenser to distribute news to instance;
private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
//初始化守护进程
Utils.initDaemon(log)
//以spark用户运行
SparkHadoopUtil.get.runAsSparkUser {
() =>
// 校验hostname格式
Utils.checkHost(hostname)
// 获取executor的配置信息
val executorConf = new SparkConf
//创建rpc调用环境
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
-1,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
//根据--driver-url参数,以rpc方式创建driver节点引用
val driver = fetcher.setupEndpointRefByURI(driverUrl)
//从driver获取SparkAppConfig
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
//从SparkAppConfig中获取spark配置,并添加spark应用id
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
//关闭rpc调用环境
fetcher.shutdown()
// 将从driver获取的sparkConfiguration information encapsulation tosparkConf中
val driverConf = new SparkConf()
for ((key, value) <- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
} else {
driverConf.set(key, value)
}
}
//将driver中获取的token封装到sparkConf中
cfg.hadoopDelegationCreds.foreach {
tokens =>
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}
//利用sparkConf中的参数信息,创建sparkEnv,即创建executor的sparkEnv
//此时会完成env的属性rpcEnvThe compound assignment,将NettyRpcEnv的实例赋值给rpcEnv
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
//构建一个CoarseGrainedExecutorBackend实例,Will the instance toExecutorName registration to the message dispatcher in the
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach {
url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
//In the process of blockingmain线程直到rpcEnv退出:通过判断rpcEnvDispenser in the state of the thread pool to decide whether to continue to block;By blocking the code blockmain线程关闭
env.rpcEnv.awaitTermination()
}
}
}
2.1.To the message dispatcher registeredbackend
NettyRpcEnv是RpcEnv的实现;
In the current method invocation message dispatcherregisterRpcEndpointMethods for subsequent execution;
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
}
2.1.2.Message dispatcher registeredrpc终端
此处的rpcTerminal is aCoarseGrainedExecutorBackend实例;
由EndpointDataEncapsulation terminal information:名称、终端、终端引用、Binding inbox;
In the news dispatcher,由ConcurrentHashMap以key-valueIn the form of cachingname-EndpointData信息完成rpc终端注册;
After the encapsulation of terminal informationEndpointData缓存到LinkedBlockingQueue队列中,As the recipient for the message dispatcher dispatch queue;Equivalent to email the recipient list;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
//To obtain the address of the terminal identifier:nettyEnv.address-终端的地址,包含host和port;name-终端的名称
val addr = RpcEndpointAddress(nettyEnv.address, name)
//rpcTerminal reference
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
//rpc终端(CoarseGrainedExecutorBackend实例)Registered to the message dispatcher in the:
// By the dispatcher inner classEndpointDataEncapsulation terminal information,由ConcurrentHashMap以key-valueCaching registration
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
//The corresponding relation of cache terminal and terminal reference
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
//After the encapsulation of terminal information cache toLinkedBlockingQueue队列中,As the recipient for the message dispatcher dispatch queue;Equivalent to email the recipient list;
receivers.offer(data) // for the OnStart message
}
endpointRef
}
}
2.1.2.1 EndpointData Terminal information encapsulation class
Encapsulation terminal information:名称、终端、终端引用、Binding inbox;
inboxIs an inbox:为RpcEndpointStore messages and thread safe way to send a message;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
//For the terminal binding an inbox;
val inbox = new Inbox(ref, endpoint)
}
}
2.2. rpcEnv阻塞代码
In the current method invocation message dispatcherawaitTerminationMethods for subsequent execution;
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
//在NettyRpcEnv实例化的时候,完成dispatcher属性初始化,And instantiate the message dispatcherDispatcher
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
override def awaitTermination(): Unit = {
dispatcher.awaitTermination()
}
}
2.2.1. The message block of the code from the dispenser
The block of the calling thread pool capacity;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//线程池
private val threadpool: ThreadPoolExecutor = {
//.......
}
def awaitTermination(): Unit = {
//线程池阻塞
threadpool.awaitTermination(Long.MaxValue, TimeUnit.MILLISECONDS)
}
}
2.2.1.1 The thread pool initialization that
In the process of dispatcher thread pool instantiation,According to the thread pool threads restrictions,Pull up message loop thread,进行消息发送;
The thread pool instantiation work inCoarseGrainedExecutorBackend进程启动后,执行run方法过程中,利用sparkConf中的参数信息,创建executor的sparkEnv过程中完成;
===>CoarseGrainedExecutorBackend进程启动后,执行run方法,run方法中创建executor的sparkEnv,sparkEnvIn the process of creating need initializationrpcEnv属性,此时将NettyRpcEnv实例化后赋值给rpcEnv,NettyRpcEnv实例化时,需要初始化NettyRpcEnv的dispatcher属性,new DispatcherIn the process of the instantiated,需要初始化Dispatcher的threadpool属性;至此,Message dispatcher thread pool initialization is complete;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//The thread pool is used to distribute news
//在消息分发器Dispatcher实例化的时候,Complete the thread pool initialization;即从Dispatcher实例化开始,This code starts,The thread pool to work;
private val threadpool: ThreadPoolExecutor = {
//Determine the thread pool threads
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, availableCores))
//初始化线程池
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
//根据线程数,Pull up message loop thread:进行消息发送
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
}
2.2.1.1.1.MessageLoop-消息循环线程
Message loop thread class is a message inside the dispenser class;
Polling message receiving terminal queue,To each terminal binding inbox and process the messages sent;
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
//Polling mode get messages from the message recipient the queue receiver(Accept information terminal)
val data = receivers.take()
//After all the message recipient to obtain,Jump out of the polling
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
//Call terminal binding inboxprocess方法,When handling initialization inbox send inboxOnStart消息;
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case _: InterruptedException => // exit
case t: Throwable =>
try {
// Re-submit a MessageLoop so that Dispatcher will still work if
// UncaughtExceptionHandler decides to not kill JVM.
threadpool.execute(new MessageLoop)
} finally {
throw t
}
}
}
}
//标识MessageLoop应该退出其消息循环的有害端点
private val PoisonPill = new EndpointData(null, null, null)
}
2.2.1.1.2 Inbox.process Inbox message processing logic
Inbox的实例化:在以EndpointDataEncapsulation terminal information,会实例化一个Inbox给EndpointData的inbos属性赋值,When instantiated, aInbox初始化;
在Inbox实例化时,Will give your inbox initialize a message queue used to cache;To add a message queue and thenOnStart消息;
The instantiation news dispenser,Initialize the dispenser thread pool properties,According to the thread pool threads now pull up message loop thread,执行线程run方法;在run方法执行过程中,Will perform terminal inboxprocess消息处理方法;At this first deal with the inbox first add message,即Onstart消息;
对OnStartDuring the processing of a message,会执行CoarseGrainedExecutorBackend的onStart方法;And open a multithreaded processing messages switch;
private[netty] case object OnStart extends InboxMessage
private[netty] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint)
extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
//消息队列:In the form of a queue for message cache
@GuardedBy("this")
protected val messages = new java.util.LinkedList[InboxMessage]()
//Allows multiple threads to handle messages at the same time
@GuardedBy("this")
private var enableConcurrent = false
//Handle the inbox threads
@GuardedBy("this")
private var numActiveThreads = 0
// OnStart Messages were added as the first,The first is processed;在InboxInstantiation execute the code;
inbox.synchronized {
messages.add(OnStart)
}
/** * Process stored messages. */
def process(dispatcher: Dispatcher): Unit = {
//The news of the pending
var message: InboxMessage = null
inbox.synchronized {
if (!enableConcurrent && numActiveThreads != 0) {
return
}
//从消息队列取出消息
message = messages.poll()
if (message != null) {
//处理消息的线程数+1
numActiveThreads += 1
} else {
return
}
}
//一直死循环,Again receiving messages at any time
while (true) {
//According to the previous code,此处endpoint为一个CoarseGrainedExecutorBackend实例
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
//执行CoarseGrainedExecutorBackend的receiveAndReply方法
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, {
msg =>
throw new SparkException(s"Unsupported message $message from ${
_sender}")
})
} catch {
case e: Throwable =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}
case OneWayMessage(_sender, content) =>
//执行CoarseGrainedExecutorBackend的receive方法
endpoint.receive.applyOrElse[Any, Unit](content, {
msg =>
throw new SparkException(s"Unsupported message $message from ${
_sender}")
})
//当调用process方法时,When the first message processingOnStart消息,根据匹配规则,This code will be executed first,In dealing with other messages before,Will execute this code
case OnStart =>
//执行CoarseGrainedExecutorBackend的onStart方法
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
//Open multiple threads processing messages
enableConcurrent = true
}
}
}
case OnStop =>
val activeThreads = inbox.synchronized {
inbox.numActiveThreads }
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
//Removed from the message dispatcher terminal registration information
dispatcher.removeRpcEndpointRef(endpoint)
//调用CoarseGrainedExecutorBackend的onStop方法
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
}
}
2.2.1.2.The thread pool blocking logic
重复判断runStateWhether to a final stateTERMINATED,如果是直接返回true,如果不是,调用termination.awaitNanos(nanos)阻塞一段时间,Wake up after judgment again,如果runState是TERMINATED返回true,否则返回false.
参考ThreadPoolExecutor源码解读(三)——如何优雅的关闭线程池(shutdown、shutdownNow、awaitTermination)
public class ThreadPoolExecutor extends AbstractExecutorService {
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
}
4.注册executor
4.1.backend向driver发送注册消息
在backend的onStart方法中,backed向driverSend a message to registerexecutor;
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
//根据driverUrl异步获取driver终端引用
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap {
ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
//通过driverTerminal reference todriver发送消息,注册executor
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
}
4.2. driver处理backend注册executor消息
4.2.1.driverTerminal registration logic
在spark-submit提交spark应用后,启动driver线程后,由driver线程注册driver终端到rpcEnv中;
在spark-submit提交spark应用后,Will be carried out in a series of logical processing,其中会启动一个driver线程【参考spark源码-任务提交流程之ApplicationMaster】,这个driverThreads can be carried from the application of user classesmainMethods to perform application follow-up logic;
In the process of executing the application follow-up logic,Early will besparkContext的实例化,Instantiation process design for the attributes of the object initialization,其中就包括_schedulerBackend变量【参考Spark源码-sparkContext初始化】;
变量_schedulerBackendThe initialization logic reference【Spark源码-sparkContext初始化之TaskScheduler任务调度器】,从中可以看到,变量_schedulerBackend是StandaloneSchedulerBackend类的实例;查看源码可以看出,StandaloneSchedulerBackend类是CoarseGrainedSchedulerBackend类的一个实现;
在sparkContext#_schedulerBackend、sparkContext#_taskSchedulerAfter the initialization will perform_taskScheduler.start()Methods to start the task scheduler;代码如下:
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging {
override def start() {
//调用backend的start方法
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
}
In the task schedulerstart方法中,在spark on yarn-cluster模式下,将会调用StandaloneSchedulerBackend#start()方法:
private[spark] class StandaloneSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with StandaloneAppClientListener
with Logging {
override def start() {
//调用父类的start方法
super.start()
//......其他代码
}
}
在StandaloneSchedulerBackend#start()方法中,Executed first thing call the parent classCoarseGrainedSchedulerBackend#start()方法;
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging {
var driverEndpoint: RpcEndpointRef = null
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
//初始化driverTerminals and the terminal to registration torpcEnv中
driverEndpoint = createDriverEndpointRef(properties)
}
protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
//Terminal registered torpcEnv中
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
//构建driver终端
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
}
}
driver终端注册到rpcEnv中后,Will be the dispenser message loop in the thread pool thread scheduling executiondriverTerminal binding inboxprocess()方法,在这个方法中会调用driver终端DriverEndpoint的onStart()方法:
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging {
//driverNodes in the thread pool
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
//由driverPull up a thread node thread pool,向executorRegular tasks
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
//向driver节点发送ReviveOffers消息,由driver节点向executor分配任务
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
}
}
4.2.2 处理backend注册executor的消息
driver终端DriverEndpoint接收到backend注册executor的ask消息后,由DriverEndpoint#receiveAndReply进行消息处理;
Have been registered and blacklistexecutor不注册,通过send方式发送OneWayMessage类型的RegisterExecutorFailed消息给executor终端;Other cases normal registered after the completion of the,通过send方式发送OneWayMessage类型的RegisteredExecutor消息给executor终端;
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging {
private val executorDataMap = new HashMap[String, ExecutorData]
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
//针对RegisterExecutor类消息
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
//已注册executorNo longer registered,返回RegisterExecutorFailed给executor终端
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
}
//On the blacklist node,不注册,返回RegisterExecutorFailed给executor终端
else if (scheduler.nodeBlacklist.contains(hostname)) {
logInfo(s"Rejecting $executorId as it has been blacklisted.")
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
//缓存executor信息
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorAddress, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
//以hashMap方式缓存executor信息,完成executor在driverTerminal registration
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
//注册完成后,向executor终端发送消息RegisteredExecutor
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
//driver向executor分配任务
makeOffers()
}
//......其他代码
}
}
}
4.2.3.driver向executor分配任务
在driver完成对executor的注册后,即调用DriverEndpoint#makeOffers向executor分配任务;
从scheduler中获取tasks列表,然后轮询tasks列表,根据taskSelect processing tasksexecutor节点,Assigned to the nodetask;
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging {
private val executorDataMap = new HashMap[String, ExecutorData]
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
//获取tasks列表
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
//分配tasks
launchTasks(taskDescs)
}
}
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
//轮询task是列表
for (task <- tasks.flatten) {
//序列化task
val serializedTask = TaskDescription.encode(task)
//被序列化taskNo more than the size of the biggestrpc消息的大小,Otherwise the task is interrupted
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach {
taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
//选择处理task的executor节点
val executorData = executorDataMap(task.executorId)
//启动一个task,对应的executor上CPU减1,默认启动一个task使用一个CPU core
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${
task.taskId} on executor id: ${
task.executorId} hostname: " +
s"${
executorData.executorHost}.")
//向executorThe node assigned tasks:发送OneWayMessage类型的LaunchTask消息给executor节点
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
}
}
4.3.backend接受driver注册executor的返回消息
在driver处理executor的注册信息后,会发送OneWayMessage类型的消息给executor终端;OneWayMessageTypes of messages byCoarseGrainedExecutorBackend#receive()方法处理;
driver端注册executor成功后,在backendTerminal construction aexecutor;
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
var executor: Executor = null
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
//向driver注册成功,构造一个Executor
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case RegisterExecutorFailed(message) =>
exitExecutor(1, "Slave registration failed: " + message)
//·······Other message processing
}
}
5.task任务处理
5.1 启动task
由CoarseGrainedExecutorBackend#receive()方法处理,In this method the matchingLaunchTask消息处理逻辑;
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
var executor: Executor = null
override def receive: PartialFunction[Any, Unit] = {
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//在executor上启动task
executor.launchTask(this, taskDesc)
}
//·······Other message processing
}
}
5.2 执行task
在executor上启动一个task线程,交由executor线程池执行,并将该taskThread maintenance inexecutorThe thread of execution in the listing;
private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false,
uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)
extends Logging {
//The execution thread listing
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
//task执行线程池
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
override def newThread(r: Runnable): Thread =
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
// will hang forever if some methods are interrupted.
new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
})
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
//在executor上启动一个task线程
val tr = new TaskRunner(context, taskDescription)
//Will start the thread is added to theexecutorThe thread of execution in the listing
runningTasks.put(taskDescription.taskId, tr)
//由线程池执行task线程
threadPool.execute(tr)
}
}
6.总结
backend和driver基于RPC通信机制进行通信;
在backend进程启动后:
首先会向rpcEnv注册backend节点;
然后向driver注册executor,driver注册executor成功后,向backendReturns the registration messages, and toexecutor分配任务;
backend接到driver返回的executorRegistration successful news,构造一个Executor实例;
然后backend再处理driver分配的任务:
调用Executor,在executor上启动一个task线程,交由executor线程池执行,并将该taskThread maintenance inexecutorThe thread of execution in the listing;
7.参考资料
4-spark源码-任务提交流程之container中启动executor
Spark内核之YARN Cluster模式源码详解(Submit详解)
Spark源码——Spark on YARN Executor执行Task的过程
ThreadPoolExecutor源码解读(三)——如何优雅的关闭线程池(shutdown、shutdownNow、awaitTermination)
Spark2.0.2源码分析——RPC 通信机制(消息处理)
边栏推荐
- Lua,ILRuntime, HybridCLR(wolong)/huatuo热更对比分析
- Getting Started 03 Distinguish between development and production environments ("hot update" is performed only in the production environment)
- Unity中的GetEnumerator 方法及MoveNext、Reset方法
- Getting Started Doc 08 Conditional Plugins
- NIO工作方式浅析
- D39_坐标转换
- 【Day8】使用LVM扩容所涉及的命令
- 偷题——腾讯游戏开发面试问题及解答
- 乘云科技受邀出席2022阿里云合作伙伴大会荣获“聚力行远奖”
- 什么?CDN缓存加速只适用于加速静态内容?
猜你喜欢

Unity huatuo 革命性热更系列1.2 huatuo热更环境安装与示例项目
![[Day1] VMware software installation](/img/24/20cc77e904dbe7dc1b5224c64d6329.png)
[Day1] VMware software installation

Introductory document 05-2 use return instructions the current task has been completed

入门文档01 series按顺序执行

Three modes of vim

Autoware--北科天绘rfans激光雷达使用相机&激光雷达联合标定文件验证点云图像融合效果

Getting Started Document 07 Staged Output

NIO工作方式浅析

【Day1】VMware软件安装

spark源码-任务提交流程之-1-sparkSubmit
随机推荐
每日一题-最长有效括号-0724
Hard Disk Partitioning and Permanent Mounting
To TrueNAS PVE through hard disk
【Day6】文件系统权限管理 文件特殊权限 隐藏属性
入门文档06 向流(stream)中添加文件
Getting Started 03 Distinguish between development and production environments ("hot update" is performed only in the production environment)
D39_欧拉角与四元数
腾讯云消息队列CMQ
ROS video tutorial
D39_坐标转换
每日一题-最长回文子串-0714
Spark源码-任务提交流程之-6-sparkContext初始化
添加新硬盘为什么扫描不上?如何解决?
lvm逻辑卷及磁盘配额
The problem of redirecting to the home page when visiting a new page in dsf5.0
dsf5.0新建页面访问时重定向到首页的问题
【Day8】 RAID磁盘阵列
【Day8】磁盘及磁盘的分区有关知识
Wireshark抓包及常用过滤方法
云游戏未来展望