当前位置:网站首页>Spark streaming receiver startup and data receiving
Spark streaming receiver startup and data receiving
2022-06-22 16:57:00 【ZH519080】
Develop a habit of taking notes !!!
Small cases :
def fromsocket(ssc: StreamingContext): Unit ={
/** Use updateStateByKey Must be set before checkpoint*/
ssc.checkpoint("hdfs://zhumaster:8020/data/checkpoint")
/** If you use socket Net nested word as input mode , Use command nc -lk Port number Wait to turn on the network
* The port number used here is 9999, This port number can be customized
* Use netstat -ap | grep 9999 see 9999 Open state of the */
val soDS: ReceiverInputDStream[String] = ssc.socketTextStream("zhumaster",9999)
val wordCount: DStream[(String, Int)] = soDS.flatMap(_.split(" ")).map(x => (x,1)).reduceByKey(_ + _)
wordCount.foreachRDD{rdd =>
/** To filter out rdd Empty data , Thus avoiding empty rdd Of job Submit , Therefore, only when data is received will it be submitted job To deal with */
if (!rdd.isEmpty()) rdd.foreach(println)
}
wordCount.print()
ssc.start()
ssc.awaitTermination()
ssc.stop(true)
}StreamingContext Still used SparkContext As Spark Entrance :
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)}
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)}StreamingContext Of start Method is mainly called JobScheduler Of start Method .JobScheduler When the object starts , Launched the EventLoop、StreamingLinstenerBus、ReceiverTracker and JobGenerator object .JobGenerator be responsible for job Generate ( Each one batch Generate specific RDD GAG);JobScheduler be responsible for job Dispatch , amount to spark core Of DAGScheduler;ReceiverTracker Responsible for getting data .
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start() // Start the message loop thread , Used for processing JobScheduler event
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext) // Start the monitor , Used to update the Spark UI Of streamTab Content
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc) // Manage all input streams and input data statistics
receiverTracker.start() // The user processes data reception 、 Data caching 、Block Generate
jobGenerator.start() // be used for DStreamGraph initialization 、DStream And RDD transformation 、Job Generate submission, etc
logInfo("Started JobScheduler")
}Record here Receiver Start up principle and data receiving of .
Receiver Continuously receive incoming data from external data sources , And report the metadata of continuously received data to Driver, Every batchDuration Different... Will be generated according to the reported data job And execute the corresponding RDD transformation operation .
Data is received from ReceiverTracker Of start The method begins with . stay Executor Start the Receiver To continuously receive stream data . The received data is discrete , Must be integrated Block, Give Way BlockManager Preservation , And notify Driver End saved Block Metadata information .ReceiverTracker Of start The first decision in the method is InputStreams Whether there is , And use launchReceivers start-up Executor Related processes in :
def start(): Unit = synchronized {
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}start-up Receiver The execution thread of the will determine receiverInputStreams Whether the object exists , Only when it is not empty will it start Receiver,receiverInputStreams Is registered to DStreamGraph Medium ReceiverInputDStream, therefore Receiver The startup of depends on the input source .
launchReceivers The method implements : Pass the received data through endpoint Send to worker Of executor On . In its method ReceiverTracker Of runDummySparkJob Method ,runDummySparkJob The purpose of is to distribute the received data to all nodes .
ReceiverTracker Of launchReceivers Method source code :
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob() // Prevent all Receiver All assigned to the same slaveNode On , Ensure load balancing
endpoint.send(StartAllReceivers(receivers)) // Here endpoint In fact, that is ReceiverTrackerEndPoint
}ReceiverTrackerEndPoint received StartAllReceivers Treatment mode , Its source :
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
......According to a certain scheduling policy, the data is transferred to receivers Assign the corresponding executors,Receiver The dispatch of is not entrusted to spark It's done by the kernel , But by the spark streaming The framework completes the scheduling , The aim is to avoid spark The kernel will Receiver As ordinary job And more than one Receiver Scheduling to the same node .
ReceiverTracker Of startReceiver The method is complex , Mainly according to the planning executor To start up receiver, And in spark job In the implementation of scheduledLocations Created RDD.ReceiverTracker Class startReceiver Method source code :
private def startReceiver(receiver: Receiver[_],scheduledLocations: Seq[TaskLocation]): Unit = {
......
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// stay worker Start the Receiver, call ReceiverSupervisorImpl Of start To start the on this node supervisor
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
// When startReceiverFunc When called ,ReceiverSupervisor Will be activated
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
}......
// call SparkContext Of makeRDD Created RDD, The RDD There's only one piece of data , Namely Receiver object
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1) // only one Receiver
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
//SparkContext.submitJob Method statement Streaming For each Receiver It starts a job, Rather than by Streaming Of action Action to commit and execute Streaming job
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
}ReceiverSupervisiorImpl It is inherited from ReceiverSupervisor,ReceiverSupervisor Called in startReceiver Method . First call onReceiverStart Method , take Receiver Register with receiverTracker. If the registration is successful , call Receiver Of onStart stay Executor Start the Receiver And continuously receive data , Submit the received data to BlockManager management , thus Receiver Start up .
Receiver The manager of is ReceiverSupervisor. To receive Receiver Traversal , Every Receiver Corresponding to one ReceiverSupervisorImpl, Every ReceiverSupervisorImpl Corresponding to one ReceiverSupervisor, That is, a ReceiverSupervisor Start and manage a Receiver.ReceiverSupervisorImpl Of start Started the invoked ReceiverSupervisor Of start Method , To start ReceiverSupervisor, Its source :
def start() {
onStart()
startReceiver()
}Still check ReceiverTracker Of startReceiver Method . if Receiver Corresponding job complete , Whether it is successful or not is as long as ReceiverTracker If it has not been stopped, it will be sent all the time RestartReceiver A message to ReceiverTrackerEndpoint, restart Receiver.
summary :Driver Terminal ReceiverTracker Manage all Executor Upper Receiver Mission , There is one ReceiverTrackerEndpoint Message communication body , This message communication body is in startReceiver Method Receiver Of job In detail Executor On the implementation , And receive Executor The message sent by end ( Such as Receiver Registration results ). stay Executor There's one side ReceiverSupervison Specialized management Receiver, be responsible for Receiver Registration start and ReceiverTracker Information interaction of .
边栏推荐
- Redis实现延迟队列的正确姿势
- What is the difference between "img" and "ALT" in the interview question
- Spark's NaiveBayes Chinese text classification
- Jsp Learning (2) - - jsp script Elements and instructions
- Examples of MySQL account addition, deletion, modification, data import and export commands
- 数据库mysql 主从方案
- What is restful and what rules should be followed when designing rest APIs?
- STM32 ADC acquisition via DMA (HAL Library)
- 面试题之 <img>标签 的 title 和 alt 有什么区别
- 洞见科技牵头的全球「首个」IEEE隐私计算「互联互通」国际标准正式启动
猜你喜欢

What is restful and what rules should be followed when designing rest APIs?

Learning about ABAP program tuning (IV) loop where key

2022年中国重卡智能化升级专题研究

系统吞吐量、TPS(QPS)、用户并发量、性能测试概念和公式

高可用性的ResourceManager

5 modes of IO model

【微信小程序封装底部弹出框二】

Summary of safari compatibility issues

Qt笔记-QMap自定义键(key)

对ABAP程序调优的学习(四)LOOP WHERE KEY
随机推荐
Interview knowledge points
使用IDM让百度云加速的方法
web技术分享| 【高德地图】实现自定义的轨迹回放
Scala for derivation: the ability to define a value in the first part of a for expression and use it in subsequent (outer) expressions
In the era of video explosion, who is supporting the high-speed operation of video ecological network?
Implementation classes with similar execution logic use the template pattern
Simple understanding of asynchronous IO
变量
scala之闭包函数浅知
The way to optimize spark performance -- solving N poses of spark data skew
Purchase guide - how to purchase a high-quality conference tablet, these aspects must be compared
Short video source code development, high-quality short video source code need to do what?
[pop up box 2 at the bottom of wechat applet package]
面试题之JS判断数据类型的方法
JS获取数据类型方法总结
[MYSQL]数据同步提示:Specified key was too long;max key length is 767 bytes
让代码优雅起来(学会调试+代码风格)
超出文本部分用省略号表示
jsp学习之开发环境的配置
[C language] use of library function qsort