当前位置:网站首页>Analysis of the read data source code of spark shuffle

Analysis of the read data source code of spark shuffle

2022-06-22 16:46:00 ZH519080

An abstract interface for computing a specified partition , think CoGroupedRDD( perhaps ShuffleRDD, Probably compute The details are different , however shuffle It is the same when calling the read class or method ) Of compute Method is implementation , Source code :

override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {val split = s.asInstanceOf[CoGroupPartition]
  val numRdds = dependencies.length
  val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
  for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
    case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
      val dependencyPartition = split.narrowDeps(depNum).get.split
      // Read the parent RDD The data of 
      val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
      rddIterators += ((it, depNum))
    case shuffleDependency: ShuffleDependency[_, _, _] =>
      //  First of all, from the SparkEnv obtain ShuffleManager, And then from ShuffleDependency Get registered to ShuffleManager When you get shuffleHandle, according to shuffleHandle And the current Task Corresponding partition ID obtain ShuffleWriter, According to the ShuffleReader call read Interface , Read Shuffle Of Map Output 
      val it = SparkEnv.get.shuffleManager
        .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
        .read()
      rddIterators += ((it, depNum))
  }
  val map = createExternalMap(numRdds)
  for ((it, depNum) <- rddIterators) {
    map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
  }
  context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
  context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
  context.internalMetricsToAccumulators(
    InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
  new InterruptibleIterator(context,
    map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}

We can see from the source code , Bandwidth dependent RDD Of compute operation , Finally through SparkEnv Of ShuffleManager Example of getReader Method to get the data reader , Then call the reader's read Method to read from the specified partition range Shuffle data .

Trait ShuffleReader Is a subclass BlockStoreShuffleReader Realization , among BlockStoreShuffleReader Of read Method source code :

/**  For the sake of Reduce Tasks read and merge key-values value  */
override def read(): Iterator[Product2[K, C]] = {
  val blockFetcherItr = new ShuffleBlockFetcherIterator(context,  blockManager.shuffleClient,  blockManager,
/**  When ShuffleMapTask Register to mapOutputTracker Metadata information , Will pass mapOutputTracker obtain , At the same time, specify the returned partition */
    mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
/** The default is 48M, Parallel read strategy : Avoid using too much bandwidth on the target machine , You can also start the parallel mechanism to speed up the reading */
    SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
  // The unique identification of each data block obtained above ID Information and its corresponding input stream 
  val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
    blockManager.wrapForCompression(blockId, inputStream)  // lz4、lzf、snappy Three types of compressors 
  }
  val ser = Serializer.getSerializer(dep.serializer)
  val serializerInstance = ser.newInstance()
  //  For each stream Create a key-values iterator 
  val recordIter = wrappedStreams.flatMap { wrappedStream =>
    serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
  }
  //  Update context workload 
  val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
  val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
    recordIter.map(record => {
      readMetrics.incRecordsRead(1)
      record
    }),
    context.taskMetrics().updateShuffleReadMetrics())
  //  In order to support task cancellation , Must use interruptible iterators 
  val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
// The read data is aggregated 
  val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
    if (dep.mapSideCombine) { // The data obtained is in Map End to end 
      val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
//Map Each partition at the end is for key The merged results are aggregated again ,Map The combination of can greatly reduce the network transmission 
      dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
    } else { // Only need Reduce End polymerization 
      val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
      dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
    }
  } else {  // No aggregation is required 
    require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
    interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
  }

  //  Based on Sort Of Shuffle In the process of implementation , Default is based on PartitionId Sort , The internal data of the partition is not sorted , So added keyOrdering Variable , Provide identification information about whether to sort the data in the partition , If sorting is defined , Sort the output results 
  dep.keyOrdering match {  // Determine whether to sort 
    case Some(keyOrd: Ordering[K]) =>
      //  To reduce memory pressure , avoid GC expenses , An external sorter is introduced to sort data . When there is not enough memory to hold the sorted data , According to the configuration spark.shuffle.spill Property to determine whether or not spill To disk , The default is to turn on spill On off .
      val sorter =
        new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = Some(ser))
      sorter.insertAll(aggregatedIter)
      context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
      context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
      context.internalMetricsToAccumulators(
        InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
      CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
    case None =>
      aggregatedIter // If you do not need to sort partitions, you can directly return 
  }
}

stay BlockStoreShuffleReader Of read Method call ShuffleBlockFetcherIterator Constructors , Realization ShuffleBlockFetcherIterator.initialize Method , stay initialize Methods are implemented successively splitLocalRemoteBlocks、fetchUpToMaxBytes and fetchLocalBlocks Other methods , First of all, let's analyze ShuffleBlockFetcherIterator Of splitLocalRemoteBlocks Method source code :

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
  //  Start in parallel at most each time 5 Threads from 5 Read data on nodes , Capacity per request <= spark.reducer.maxMbInFlight( The default is 48M)/5
  val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
  val remoteRequests = new ArrayBuffer[FetchRequest]
  var totalBlocks = 0
  for ((address, blockInfos) <- blocksByAddress) {
    totalBlocks += blockInfos.size
    if (address.executorId == blockManager.blockManagerId.executorId) { // Get local data blocks 
      localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) // Filter if the data block is empty , When data and BlockManager At the same node , Then directly put Blocks Deposit in localBlocks in 
      numBlocksToFetch += localBlocks.size
    } else { // The data is not local 
      val iterator = blockInfos.iterator
      var curRequestSize = 0L
      var curBlocks = new ArrayBuffer[(BlockId, Long)]
      while (iterator.hasNext) { //BlockId The format of :shuffle_+shuffleId_+mapId_+reduceId
        val (blockId, size) = iterator.next()
        if (size > 0) { // Filter empty data blocks 
          curBlocks += ((blockId, size))
          remoteBlocks += blockId // Record data blocks on the remote machine Id(BlockId)
          numBlocksToFetch += 1
          curRequestSize += size
        } else if (size < 0) {
          throw new BlockException(blockId, "Negative block size " + size)
        } 
        if (curRequestSize >= targetRequestSize) {
          remoteRequests += new FetchRequest(address, curBlocks)
          curBlocks = new ArrayBuffer[(BlockId, Long)]
          curRequestSize = 0
        }
      } // When data is not local , Generate remoteRequests, The conditions are :curReuestSize Equal to maxBytesInFlight/5, Will be able to block Information in remoteRequests in , Include block Location ,blockId,block Size information 
      if (curBlocks.nonEmpty) {
        remoteRequests += new FetchRequest(address, curBlocks)
      }
    } // Be careful :FetchRequest There may be a memory leak , If single Block Too big ,fetch It takes up too much memory OOM
  }
  remoteRequests
}

ShuffleBlockFetcherIterator Of fetchUpToMaxBytes The method is to send a request to get remote data , Only the sum of the current data volume and the requested data volume is less than maxBytesInFlight To send a request :

private def fetchUpToMaxBytes(): Unit = {
  while (fetchRequests.nonEmpty &&
    (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
    sendRequest(fetchRequests.dequeue())
  }
}

  By implementing fetchUpToMaxBytes Method after getting the remote data , By the way fetchLocalBlocks Method to get local data ,ShuffleBlockFetcherIterator Of fetchLocalBlocks Source code :

private[this] def fetchLocalBlocks() {
  val iter = localBlocks.iterator
  while (iter.hasNext) {
    val blockId = iter.next()
    try {
      val buf = blockManager.getBlockData(blockId)
      shuffleMetrics.incLocalBlocksFetched(1)
      shuffleMetrics.incLocalBytesRead(buf.size)
      buf.retain()
      results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf))
    } catch {
      case e: Exception =>
        results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
        return
    }
  }
}

fetcheLocalBlocks Method to get the local data block is actually calling BlockManager Of getBlockData Method ,BlockManager Of getBlockData Method is actually called IndexShuffleBlockResolver or FileShuffleBlockResolver( Two types of inherited traits ShuffleBlockResolver) Of getBlockData:

ShuffleBlockFetcherIterator.fetchLocalBlocks -> BlockManager.getBlockData -> ShuffleBlockResolver.getBlockData technological process .

IndexShuffleBlockResolver Of getBlockData Realized DiskBlockManager Of getFile Method ;FileShuffleBlockResolver Of getBlockData What we achieve is FileSegmentManagedBuffer Constructors .

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
  //  Start in parallel at most each time 5 Threads from 5 Read data on nodes , Capacity per request <= spark.reducer.maxMbInFlight( The default is 48M)/5
  val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
  val remotRequests = new ArrayBuffer[FetchRequest]
  var totalBlocks = 0
  for ((address, blockInfos) <- blocksByAddress) {
    totalBlocks += blockInfos.size
    if (address.executorId == blockManager.blockManagerId.executorId) { // Get local data blocks 
      localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) // Filter if the data block is empty , When data and BlockManager At the same node , Then directly put Blocks Deposit in localBlocks in 
      numBlocksToFetch += localBlocks.size
    } else { // The data is not local 
      val iterator = blockInfos.iterator
      var curRequestSize = 0L
      var curBlocks = new ArrayBuffer[(BlockId, Long)]
      while (iterator.hasNext) { //BlockId The format of :shuffle_+shuffleId_+mapId_+reduceId
        val (blockId, size) = iterator.next()
        if (size > 0) { // Filter empty data blocks 
          curBlocks += ((blockId, size))
          remoteBlocks += blockId // Record data blocks on the remote machine Id(BlockId)
          numBlocksToFetch += 1
          curRequestSize += size
        } else if (size < 0) {
          throw new BlockException(blockId, "Negative block size " + size)
        } 
        if (curRequestSize >= targetRequestSize) {
          remoteRequests += new FetchRequest(address, curBlocks)
          curBlocks = new ArrayBuffer[(BlockId, Long)]
          curRequestSize = 0
        }
      } // When data is not local , Generate remoteRequests, The conditions are :curReuestSize Equal to maxBytesInFlight/5, Will be able to block Information in remoteRequests in , Include block Location ,blockId,block Size information 
      if (curBlocks.nonEmpty) {
        remoteRequests += new FetchRequest(address, curBlocks)
      }
    } // Be careful :FetchRequest There may be a memory leak , If single Block Too big ,fetch It takes up too much memory OOM
  }
  remoteRequests
}

 

原网站

版权声明
本文为[ZH519080]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221523255279.html