当前位置:网站首页>Spark streaming receiver startup and data receiving

Spark streaming receiver startup and data receiving

2022-06-22 16:57:00 ZH519080

Develop a habit of taking notes !!!

Small cases :

def fromsocket(ssc: StreamingContext): Unit ={
  /** Use updateStateByKey Must be set before checkpoint*/
  ssc.checkpoint("hdfs://zhumaster:8020/data/checkpoint")
  /**  If you use socket Net nested word as input mode , Use command nc -lk  Port number   Wait to turn on the network 
    *  The port number used here is 9999, This port number can be customized 
    *  Use netstat -ap | grep 9999   see 9999 Open state of the  */
  val soDS: ReceiverInputDStream[String] = ssc.socketTextStream("zhumaster",9999)
  val wordCount: DStream[(String, Int)] = soDS.flatMap(_.split(" ")).map(x => (x,1)).reduceByKey(_ + _)
  wordCount.foreachRDD{rdd =>
    /** To filter out rdd Empty data , Thus avoiding empty rdd Of job Submit , Therefore, only when data is received will it be submitted job To deal with */
    if (!rdd.isEmpty()) rdd.foreach(println)
  }
  wordCount.print()
  ssc.start()
  ssc.awaitTermination()
  ssc.stop(true)
}

StreamingContext Still used SparkContext As Spark Entrance :

def this(conf: SparkConf, batchDuration: Duration) = {
  this(StreamingContext.createNewSparkContext(conf), null, batchDuration)}
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
  new SparkContext(conf)}

StreamingContext Of start Method is mainly called JobScheduler Of start Method .JobScheduler When the object starts , Launched the EventLoop、StreamingLinstenerBus、ReceiverTracker and JobGenerator object .JobGenerator be responsible for job Generate ( Each one batch Generate specific RDD GAG);JobScheduler be responsible for job Dispatch , amount to spark core  Of DAGScheduler;ReceiverTracker Responsible for getting data .

def start(): Unit = synchronized {
  if (eventLoop != null) return // scheduler has already been started
  logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  eventLoop.start() // Start the message loop thread , Used for processing JobScheduler event 
  for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)  // Start the monitor , Used to update the Spark UI Of streamTab Content 
  receiverTracker = new ReceiverTracker(ssc)
  inputInfoTracker = new InputInfoTracker(ssc) // Manage all input streams and input data statistics 
  receiverTracker.start() // The user processes data reception 、 Data caching 、Block Generate 
  jobGenerator.start() // be used for DStreamGraph initialization 、DStream And RDD transformation 、Job Generate submission, etc 
  logInfo("Started JobScheduler")
}

Record here Receiver Start up principle and data receiving of .

Receiver Continuously receive incoming data from external data sources , And report the metadata of continuously received data to Driver, Every batchDuration Different... Will be generated according to the reported data job And execute the corresponding RDD transformation operation .

Data is received from ReceiverTracker Of start The method begins with . stay Executor Start the Receiver To continuously receive stream data . The received data is discrete , Must be integrated Block, Give Way BlockManager Preservation , And notify Driver End saved Block Metadata information .ReceiverTracker Of start The first decision in the method is InputStreams Whether there is , And use launchReceivers start-up Executor Related processes in :

def start(): Unit = synchronized {
  if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers()
    logInfo("ReceiverTracker started")
    trackerState = Started
  }
}

start-up Receiver The execution thread of the will determine receiverInputStreams Whether the object exists , Only when it is not empty will it start Receiver,receiverInputStreams Is registered to DStreamGraph Medium ReceiverInputDStream, therefore Receiver The startup of depends on the input source .

launchReceivers The method implements : Pass the received data through endpoint Send to worker Of executor On . In its method ReceiverTracker Of runDummySparkJob Method ,runDummySparkJob The purpose of is to distribute the received data to all nodes .

ReceiverTracker Of launchReceivers Method source code :

private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })
  runDummySparkJob()  // Prevent all Receiver All assigned to the same slaveNode On , Ensure load balancing 
  endpoint.send(StartAllReceivers(receivers)) // Here endpoint In fact, that is ReceiverTrackerEndPoint
}

ReceiverTrackerEndPoint received StartAllReceivers Treatment mode , Its source :

override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors)
    }
......

According to a certain scheduling policy, the data is transferred to receivers Assign the corresponding executors,Receiver The dispatch of is not entrusted to spark It's done by the kernel , But by the spark streaming The framework completes the scheduling , The aim is to avoid spark The kernel will Receiver As ordinary job And more than one Receiver Scheduling to the same node .

ReceiverTracker Of startReceiver The method is complex , Mainly according to the planning executor To start up receiver, And in spark job In the implementation of scheduledLocations Created RDD.ReceiverTracker Class startReceiver Method source code :

private def startReceiver(receiver: Receiver[_],scheduledLocations: Seq[TaskLocation]): Unit = {
......
  val checkpointDirOption = Option(ssc.checkpointDir)
  val serializableHadoopConf =
    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

  // stay worker Start the Receiver, call ReceiverSupervisorImpl Of start To start the on this node supervisor
  val startReceiverFunc: Iterator[Receiver[_]] => Unit =
// When startReceiverFunc When called ,ReceiverSupervisor Will be activated 
      if (TaskContext.get().attemptNumber() == 0) {
        val receiver = iterator.next()
        assert(iterator.hasNext == false)
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      }......  
//  call SparkContext Of makeRDD Created RDD, The RDD There's only one piece of data , Namely Receiver object 
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
      ssc.sc.makeRDD(Seq(receiver), 1) // only one Receiver
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
//SparkContext.submitJob Method statement Streaming For each Receiver It starts a job, Rather than by Streaming Of action Action to commit and execute Streaming job
  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
}

ReceiverSupervisiorImpl It is inherited from ReceiverSupervisor,ReceiverSupervisor Called in startReceiver Method . First call onReceiverStart Method , take Receiver Register with receiverTracker. If the registration is successful , call Receiver Of onStart stay Executor Start the Receiver And continuously receive data , Submit the received data to BlockManager management , thus Receiver Start up .

Receiver The manager of is ReceiverSupervisor. To receive Receiver Traversal , Every Receiver Corresponding to one ReceiverSupervisorImpl, Every ReceiverSupervisorImpl Corresponding to one ReceiverSupervisor, That is, a ReceiverSupervisor Start and manage a Receiver.ReceiverSupervisorImpl Of start Started the invoked ReceiverSupervisor Of start Method , To start ReceiverSupervisor, Its source :

def start() {
  onStart()
  startReceiver()
}

Still check ReceiverTracker Of startReceiver Method . if Receiver Corresponding job complete , Whether it is successful or not is as long as ReceiverTracker If it has not been stopped, it will be sent all the time RestartReceiver A message to ReceiverTrackerEndpoint, restart Receiver.

summary :Driver Terminal ReceiverTracker Manage all Executor Upper Receiver Mission , There is one ReceiverTrackerEndpoint Message communication body , This message communication body is in startReceiver Method Receiver Of job In detail Executor On the implementation , And receive Executor The message sent by end ( Such as Receiver Registration results ). stay Executor There's one side ReceiverSupervison Specialized management Receiver, be responsible for Receiver Registration start and ReceiverTracker Information interaction of .

原网站

版权声明
本文为[ZH519080]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221523254234.html