当前位置:网站首页>Source code analysis of spark cache

Source code analysis of spark cache

2022-06-22 16:47:00 ZH519080

private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { ......
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this)) // Clean cache 
    sc.persistRDD(this) // Register and track cached rdd
  }
  storageLevel = newLevel
  this
}

First call RDD Of persist When the method is used , Use ContextCleaner Class registerRDDForCleanup Clean cache .SparkContext Of persistRDD Methods will (rdd.id,rdd) Load into persistentRdds( It's a HashMap) in ,key yes rdd.id,value Is a time stamped rdd quote ,persistentRdds Used to track what has been marked as persisit Of RDD Refer to the .

rdd Storage cache phase of

When executed rdd One of the action In operation , Then call DAGScheduler.submitJob To submit job, complete stage Of ShuffleMapTask or ResultTask Only when triggered, does it really enter the storage cache stage , When calling ShuffleMapTask/ResultTask Of runTask When the method is used , stay runTask Method RDD Of iterator Method ,RDD Of iterator Method source code :

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  } else {
    computeOrReadCheckpoint(split, context)
  }}

When the storage level StorageLevel Not for NONE when , Explain that the previous has been cache 了 . So when rdd Storage times StorageLevel Not for NONE when , Go directly to CacheManager In order to get (getOrCompute), Or you will compute Or from checkPoint Read (RDD.computeOrReadCheckpoint).CacheManager Class call getOrCompute Method source code :

def getOrCompute[T](rdd: RDD[T],partition: Partition,context: TaskContext,storageLevel: StorageLevel): Iterator[T] = {
  val key = RDDBlockId(rdd.id, partition.index) // obtain rdd Of block data id Number 
  blockManager.get(key) match { // From the storage manager BlockManager Match the required data in 
    case Some(blockResult) =>
      val existingMetrics = context.taskMetrics
        .getInputMetricsForReadMethod(blockResult.readMethod)
      existingMetrics.incBytesRead(blockResult.bytes)

      val iter = blockResult.data.asInstanceOf[Iterator[T]]
      new InterruptibleIterator[T](context, iter) {
        override def next(): T = {
          existingMetrics.incRecordsRead(1)
          delegate.next()
        }
      }
    case None => // although rdd Persisted , But there's no data 
      val storedValues = acquireLockForPartition[T](key)  // Call again BlockManager Of get Method to get data 
      if (storedValues.isDefined) {
        return new InterruptibleIterator[T](context, storedValues.get)
      }
      try {
// if acquireLockForPartition Still no data was obtained , if rdd has checkpoint too , From checkpoint Get... In the directory ; if rdd It's not set up checkpoint Or from checkpoint No data was obtained in the directory , Then from the parent rdd Middle computation 
        val computedValues = rdd.computeOrReadCheckpoint(partition, context)
        if (context.isRunningLocally) {
          return computedValues
        }
        val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
// Data is lost for some reason , Will be taken from checkpoint Or recalculated data , Persist a copy 
        val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
        val metrics = context.taskMetrics
        val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
        metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
        new InterruptibleIterator(context, cachedValues)
      } finally {......}

CacheManager Of getOrCompute The method mainly talks about starting from BlockManager The access has been persisted (cache) The data of , If the real existence has been persistent rdd Data is returned directly (case(blockResult)). Although it has been persistent rdd Data may not be available , From checkpoint In the directory or from the parent rdd Get data in (computeOrReadCheckpoint), One copy of the retrieved data can be persisted .RDD Of computeOrReadCheckpoint Method source code :

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

computeOrReadCheckpoint Method : If you can start from checkpoint The data obtained from the directory is calculated directly , If you can't get the data, you will traverse parent rdd The returned result is passed to the calculation function for calculation .

stay CacheManager Of getOrCompute When no data is obtained from persistence , Need from checkpoint Directories or recalculated results need to be re persisted / cached , call CacheManager Of putInBlockManager Method .CacheManager Of putInBlockManager Source code :

private def putInBlockManager[T](key: BlockId,values: Iterator[T],level: StorageLevel,updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
  val putLevel = effectiveStorageLevel.getOrElse(level)
  if (!putLevel.useMemory) {
// this RDD Will not be cached in memory , Therefore, the calculated value can be directly passed to as an iterator BlockManager, Instead of getting... In memory first 
    updatedBlocks ++=blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
    blockManager.get(key) match {
      case Some(v) => v.data.asInstanceOf[Iterator[T]]
      case None =>
        throw new BlockException(key, s"Block manager failed to return cached value for $key!")
    }
  } else {
// this RDD Will be cached in memory , A calculated value cannot be passed to as an iterator BlockManager, And read it later , Because before replying to the partition , The partition may be deleted from memory 
    blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
      case Left(arr) =>
// Got the entire partition , Cache it in memory 
        updatedBlocks ++= blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
        arr.iterator.asInstanceOf[Iterator[T]]
      case Right(it) =>
        val returnValues = it.asInstanceOf[Iterator[T]] // There is not enough space to cache partition data into memory 
        if (putLevel.useDisk) {
          val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
            useOffHeap = false, deserialized = false, putLevel.replication)
          putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
        } else {returnValues } } } }

CacheManager Of putInBlockManager Method : When there is not enough space to partition , take MEMORY_AND_DISK Partition cast to disk . hold checkpoint Catalog data or parent rdd Data according to StorageLevel Re cache / Persist to BlockManager in .

CacheManager Of putInBlockManager Call in method BlockManager Of putIterator Method is used to store the result value as an iterator BlockManager in , stay putIterator What is really called in the method is BlockManager Of doPut Method ,doPut The way is to BlockManager The embodiment of writing data .BlockManager Of doPut Method source code :

private def doPut(blockId: BlockId,data: BlockValues,level: StorageLevel,tellMaster: Boolean = true,effectiveStorageLevel: Option[StorageLevel] = None)
  : Seq[(BlockId, BlockStatus)] = { ......
  val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  val putBlockInfo = { // For the block, Create a BlockInfo
    val tinfo = new BlockInfo(level, tellMaster)
    val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
    if (oldBlockOpt.isDefined) {
      if (oldBlockOpt.get.waitForReady()) {return updatedBlocks} oldBlockOpt.get} else { tinfo} }......
  putBlockInfo.synchronized { // Yes BlockInfo Lock , Multi thread concurrent synchronous access 
    var marked = false
    try { // Depending on the persistence level , Select storage level BlockStore:MemoryStore、DiskStore and ExternalBlockStore
      val (returnValues, blockStore: BlockStore) = { 
        if (putLevel.useMemory) { //MemoryStore Level 
          (true, memoryStore)
        } else if (putLevel.useOffHeap) { //ExeternalBlockStore Level 
          (false, externalBlockStore)
        } else if (putLevel.useDisk) {  //DiskStore Level 
          (putLevel.replication > 1, diskStore)
        } else {......} }
      val result = data match {  // Really start to BlockManager In the data :IteraotorValues、ArrayValues、ByteBufferValues
        case IteratorValues(iterator) =>
          blockStore.putIterator(blockId, iterator, putLevel, returnValues)
        case ArrayValues(array) =>
          blockStore.putArray(blockId, array, putLevel, returnValues)
        case ByteBufferValues(bytes) =>
          bytes.rewind()
          blockStore.putBytes(blockId, bytes, putLevel)
      } ......
      if (putLevel.useMemory) { // Tracking from memory Delete the block
        result.droppedBlocks.foreach { updatedBlocks += _ }
      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) // obtain block Corresponding blockStatus
      if (putBlockStatus.storageLevel != StorageLevel.NONE) { ......
// The newly written block data , Send to BlockManagerMasterEndpoint Conduct block Metadata synchronization and maintenance 
          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
        }
        updatedBlocks += ((blockId, putBlockStatus))
      } } finally { ......
  if (putLevel.replication > 1) { // The persistence level is defined as _2, Need to block Data is copied to other nodes for backup 
    data match {......  //replicate Method for data backup 
        replicate(blockId, bytesAfterPut, putLevel)
    }
  }
  BlockManager.dispose(bytesAfterPut)   ......
}

CacheManager Of putInBlockManager Method call BlockManager Of get Method to get rdd data .BlockManager Of get Method source code :

def get(blockId: BlockId): Option[BlockResult] = {
  val local = getLocal(blockId)  // Get local data first 
  if (local.isDefined) {
    return local
  }
  val remote = getRemote(blockId)  // Get data locally and then get data remotely 
  if (remote.isDefined) {
    return remote
  } None}

 

原网站

版权声明
本文为[ZH519080]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221523254853.html