当前位置:网站首页>Spark Streaming-Receiver启动和数据接收
Spark Streaming-Receiver启动和数据接收
2022-06-22 15:23:00 【ZH519080】
养成一个记笔记的习惯!!!
小案例:
def fromsocket(ssc: StreamingContext): Unit ={
/**使用updateStateByKey前必须要设置checkpoint*/
ssc.checkpoint("hdfs://zhumaster:8020/data/checkpoint")
/** 如使用socket网络嵌套字作为输入模式,使用命令nc -lk 端口号 进行开启网络的等待
* 此处用的端口号是9999,此端口号可自定义
* 使用netstat -ap | grep 9999 查看9999的开启状态 */
val soDS: ReceiverInputDStream[String] = ssc.socketTextStream("zhumaster",9999)
val wordCount: DStream[(String, Int)] = soDS.flatMap(_.split(" ")).map(x => (x,1)).reduceByKey(_ + _)
wordCount.foreachRDD{rdd =>
/**过滤掉rdd为空的数据,从而避免空rdd的job提交,因此只有收到数据时才会提交job进行处理*/
if (!rdd.isEmpty()) rdd.foreach(println)
}
wordCount.print()
ssc.start()
ssc.awaitTermination()
ssc.stop(true)
}StreamingContext依然使用SparkContext作为Spark的入口:
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)}
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)}StreamingContext的start方法主要是调用JobScheduler的start方法。JobScheduler对象启动时,启动了EventLoop、StreamingLinstenerBus、ReceiverTracker和JobGenerator对象。JobGenerator负责job生成(将每个batch生成具体的RDD GAG);JobScheduler负责job调度,相当于spark core 的DAGScheduler;ReceiverTracker负责获取数据。
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start() //启动消息循环线程,用于处理JobScheduler事件
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext) //启动监听器,用于更新Spark UI的streamTab内容
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc) //管理所有的输入流以及输入的数据统计
receiverTracker.start() //用户处理数据接收、数据缓存、Block生成
jobGenerator.start() //用于DStreamGraph初始化、DStream与RDD转换、Job生成提交等
logInfo("Started JobScheduler")
}在此记录Receiver的启动原理和数据接收。
Receiver不断持续的接收外部数据源流入数据,并将不断接收到的数据的元数据汇报给Driver,每个batchDuration会根据汇报的数据生成不同的job并执行相应的RDD transformation操作。
数据的接收是从ReceiverTracker的start方法开始的。在Executor上启动Receiver以持续接收流数据。接收的数据是离散的,必须收集成Block,让BlockManager进行保存,并通知Driver端已保存的Block的元数据信息。ReceiverTracker的 start方法中首先判InputStreams是否存在,并且使用launchReceivers启动Executor中的相关进程:
def start(): Unit = synchronized {
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}启动Receiver的执行线程会判断receiverInputStreams对象是否存在,只有当其不会空是才会启动Receiver,receiverInputStreams是注册到DStreamGraph中的ReceiverInputDStream,因此Receiver的启动依赖输入源。
launchReceivers方法实现的是:把接收的数据通过endpoint发送到worker的executor上。在其方法中实现了ReceiverTracker的runDummySparkJob方法,runDummySparkJob的目的是将接收的数据分配到所有的节点上。
ReceiverTracker的launchReceivers方法的源码:
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob() //防止所有的Receiver都分配到同一个slaveNode上,保证负载均衡
endpoint.send(StartAllReceivers(receivers)) //此处的endpoint其实就是ReceiverTrackerEndPoint
}ReceiverTrackerEndPoint收到StartAllReceivers的处理方式,其源码:
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
。。。。。。根据一定的调度策略把传入receivers分配相应的executors,Receiver的调度并不是交给spark内核完成的,而是由spark streaming框架完成调度,目的是避免spark内核将Receiver当作普通的job而将多个Receiver调度到同一个节点上。
ReceiverTracker的startReceiver方法比较复杂,主要是根据规划的executor来启动receiver,并在spark job中执行scheduledLocations创建的RDD。ReceiverTracker类的startReceiver方法的源码:
private def startReceiver(receiver: Receiver[_],scheduledLocations: Seq[TaskLocation]): Unit = {
。。。。。。
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
//在worker上启动Receiver,调用ReceiverSupervisorImpl的start来启动该节点上的supervisor
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
//当startReceiverFunc被调用时,ReceiverSupervisor就会被启动
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
}。。。。。。
// 调用SparkContext的makeRDD创建了RDD,该RDD只有一条数据,就是Receiver对象
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1) //只有一个Receiver
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
//SparkContext.submitJob方法说明Streaming为每个Receiver启动了一个job,而不是由Streaming的action操作来提交和执行Streaming job
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
}ReceiverSupervisiorImpl 是继承自ReceiverSupervisor,ReceiverSupervisor中调用了startReceiver方法。首先调用onReceiverStart方法,将Receiver注册给receiverTracker。若注册成功,调用Receiver的onStart在Executor上启动Receiver并持续的接收数据,将接收到的数据提交给BlockManager管理,至此Receiver启动完成。
Receiver的管理者是ReceiverSupervisor。对接收的Receiver进行遍历,每个Receiver对应一个ReceiverSupervisorImpl,每个ReceiverSupervisorImpl对应一个ReceiverSupervisor,即一个ReceiverSupervisor启动和管理一个Receiver。ReceiverSupervisorImpl的start启动了调用的ReceiverSupervisor的start方法,是为了启动ReceiverSupervisor,其源码:
def start() {
onStart()
startReceiver()
}依然查看ReceiverTracker的startReceiver方法。若Receiver对应的job完成,其成功与否只要ReceiverTracker还没有停止则就一直发送RestartReceiver消息给ReceiverTrackerEndpoint,重启Receiver。
总结:Driver端的ReceiverTracker管理所有Executor上的Receiver任务,有一个ReceiverTrackerEndpoint消息通信体,此消息通信体在startReceiver方法中提交Receiver的job在具体Executor上执行,并接收Executor端发送过来的消息(如Receiver注册结果)。在Executor端有一个ReceiverSupervison专门管理Receiver,负责Receiver的注册启动与ReceiverTracker的信息交互。
边栏推荐
- JS method for judging data type of interview questions
- [deep anatomy of C language] keywords if & else & bool type
- Default function control =default and =delete
- 【C语言】库函数qsort的使用
- jsp学习之(一)---------jsp概述
- ERROR 1364 (HY000): Field ssl_cipher doesnt have a default value
- User exit and customer exit in SAP ABAP -015
- 用递归法求Fibonacci数列第n项的值
- 接口幂等性设计
- [wechat applet to obtain the height of custom tabbar] is absolutely available!!!
猜你喜欢

短视频源码开发,优质的短视频源码需要做好哪几点?
![[wechat applet to obtain the height of custom tabbar] is absolutely available!!!](/img/ed/7ff70178f03b50cb7bec349c1be5e0.png)
[wechat applet to obtain the height of custom tabbar] is absolutely available!!!

JSP learning (2) -- JSP script elements and instructions

【小程序项目开发-- 京东商城】uni-app开发之分包配置

Lecture 6 of slam Lecture 14 -- nonlinear optimization

ABAP query tutorial in sap: sq01, sq02, sq03-017

jsp学习之(一)---------jsp概述
![[C language] deeply analyze the relationship between pointer and array](/img/f3/432eeee17034033361e05dde67aac3.jpg)
[C language] deeply analyze the relationship between pointer and array

IO模型的5中模式

视频爆炸时代,谁在支撑视频生态网高速运行?
随机推荐
Adding an unknown type of MCU to jflash
Make the code elegant (learn debugging + code style)
Idea installation summary
Add a millennial sign to a number (amount in millennia)
NiO file and folder operation examples
[MYSQL]一台windows电脑安装多个mysql-不同版本
SAP ABAP sub screen tutorial: call sub screen -010 in SAP
招行23型号UKey在win7上无法识别
Reddit对LaMDA模型的探讨:并非无状态,采用双重过程,相比它编辑维基百科的方式,有没有感情并不重要
过气剧本杀,被露营“复活”
使用IDM让百度云加速的方法
异步IO的简单理解
Pod type
Implementing factory mode using enumeration
高可用性的ResourceManager
北京恢复堂食半月记:如何重燃门店经营烟火气
【C语言】深度剖析指针和数组的关系
【心理学】情感心理学-当代思想和传统思想的碰撞(本篇文章将不定期持续更新)
Lecture 6 of slam Lecture 14 -- nonlinear optimization
MYSQL_ERRNO : 1205 MESSAGE :Lock wait timeout exceeded; try restarting transacti