当前位置:网站首页>Source code analysis of spark cache
Source code analysis of spark cache
2022-06-22 16:47:00 【ZH519080】
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { ......
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this)) // Clean cache
sc.persistRDD(this) // Register and track cached rdd
}
storageLevel = newLevel
this
}First call RDD Of persist When the method is used , Use ContextCleaner Class registerRDDForCleanup Clean cache .SparkContext Of persistRDD Methods will (rdd.id,rdd) Load into persistentRdds( It's a HashMap) in ,key yes rdd.id,value Is a time stamped rdd quote ,persistentRdds Used to track what has been marked as persisit Of RDD Refer to the .
rdd Storage cache phase of :
When executed rdd One of the action In operation , Then call DAGScheduler.submitJob To submit job, complete stage Of ShuffleMapTask or ResultTask Only when triggered, does it really enter the storage cache stage , When calling ShuffleMapTask/ResultTask Of runTask When the method is used , stay runTask Method RDD Of iterator Method ,RDD Of iterator Method source code :
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}}When the storage level StorageLevel Not for NONE when , Explain that the previous has been cache 了 . So when rdd Storage times StorageLevel Not for NONE when , Go directly to CacheManager In order to get (getOrCompute), Or you will compute Or from checkPoint Read (RDD.computeOrReadCheckpoint).CacheManager Class call getOrCompute Method source code :
def getOrCompute[T](rdd: RDD[T],partition: Partition,context: TaskContext,storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, partition.index) // obtain rdd Of block data id Number
blockManager.get(key) match { // From the storage manager BlockManager Match the required data in
case Some(blockResult) =>
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None => // although rdd Persisted , But there's no data
val storedValues = acquireLockForPartition[T](key) // Call again BlockManager Of get Method to get data
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}
try {
// if acquireLockForPartition Still no data was obtained , if rdd has checkpoint too , From checkpoint Get... In the directory ; if rdd It's not set up checkpoint Or from checkpoint No data was obtained in the directory , Then from the parent rdd Middle computation
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
if (context.isRunningLocally) {
return computedValues
}
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
// Data is lost for some reason , Will be taken from checkpoint Or recalculated data , Persist a copy
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {......}CacheManager Of getOrCompute The method mainly talks about starting from BlockManager The access has been persisted (cache) The data of , If the real existence has been persistent rdd Data is returned directly (case(blockResult)). Although it has been persistent rdd Data may not be available , From checkpoint In the directory or from the parent rdd Get data in (computeOrReadCheckpoint), One copy of the retrieved data can be persisted .RDD Of computeOrReadCheckpoint Method source code :
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}computeOrReadCheckpoint Method : If you can start from checkpoint The data obtained from the directory is calculated directly , If you can't get the data, you will traverse parent rdd The returned result is passed to the calculation function for calculation .
stay CacheManager Of getOrCompute When no data is obtained from persistence , Need from checkpoint Directories or recalculated results need to be re persisted / cached , call CacheManager Of putInBlockManager Method .CacheManager Of putInBlockManager Source code :
private def putInBlockManager[T](key: BlockId,values: Iterator[T],level: StorageLevel,updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
// this RDD Will not be cached in memory , Therefore, the calculated value can be directly passed to as an iterator BlockManager, Instead of getting... In memory first
updatedBlocks ++=blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
// this RDD Will be cached in memory , A calculated value cannot be passed to as an iterator BlockManager, And read it later , Because before replying to the partition , The partition may be deleted from memory
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// Got the entire partition , Cache it in memory
updatedBlocks ++= blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
val returnValues = it.asInstanceOf[Iterator[T]] // There is not enough space to cache partition data into memory
if (putLevel.useDisk) {
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {returnValues } } } }CacheManager Of putInBlockManager Method : When there is not enough space to partition , take MEMORY_AND_DISK Partition cast to disk . hold checkpoint Catalog data or parent rdd Data according to StorageLevel Re cache / Persist to BlockManager in .
CacheManager Of putInBlockManager Call in method BlockManager Of putIterator Method is used to store the result value as an iterator BlockManager in , stay putIterator What is really called in the method is BlockManager Of doPut Method ,doPut The way is to BlockManager The embodiment of writing data .BlockManager Of doPut Method source code :
private def doPut(blockId: BlockId,data: BlockValues,level: StorageLevel,tellMaster: Boolean = true,effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = { ......
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val putBlockInfo = { // For the block, Create a BlockInfo
val tinfo = new BlockInfo(level, tellMaster)
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {return updatedBlocks} oldBlockOpt.get} else { tinfo} }......
putBlockInfo.synchronized { // Yes BlockInfo Lock , Multi thread concurrent synchronous access
var marked = false
try { // Depending on the persistence level , Select storage level BlockStore:MemoryStore、DiskStore and ExternalBlockStore
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) { //MemoryStore Level
(true, memoryStore)
} else if (putLevel.useOffHeap) { //ExeternalBlockStore Level
(false, externalBlockStore)
} else if (putLevel.useDisk) { //DiskStore Level
(putLevel.replication > 1, diskStore)
} else {......} }
val result = data match { // Really start to BlockManager In the data :IteraotorValues、ArrayValues、ByteBufferValues
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
} ......
if (putLevel.useMemory) { // Tracking from memory Delete the block
result.droppedBlocks.foreach { updatedBlocks += _ }
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) // obtain block Corresponding blockStatus
if (putBlockStatus.storageLevel != StorageLevel.NONE) { ......
// The newly written block data , Send to BlockManagerMasterEndpoint Conduct block Metadata synchronization and maintenance
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
} } finally { ......
if (putLevel.replication > 1) { // The persistence level is defined as _2, Need to block Data is copied to other nodes for backup
data match {...... //replicate Method for data backup
replicate(blockId, bytesAfterPut, putLevel)
}
}
BlockManager.dispose(bytesAfterPut) ......
}CacheManager Of putInBlockManager Method call BlockManager Of get Method to get rdd data .BlockManager Of get Method source code :
def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId) // Get local data first
if (local.isDefined) {
return local
}
val remote = getRemote(blockId) // Get data locally and then get data remotely
if (remote.isDefined) {
return remote
} None}
边栏推荐
猜你喜欢

Idea installation summary
![Prometheus监控之Consul监控 [consul-exporter]](/img/9e/8547b2c38143ab0e051c1cf0b04986.png)
Prometheus监控之Consul监控 [consul-exporter]

高可用性的ResourceManager

SAP ABAP sub screen tutorial: call sub screen -010 in SAP

Smart forms-014 in SAP ABAP

What is restful and what rules should be followed when designing rest APIs?

【微信小程序自定义底部tabbar】

Make the code elegant (learn debugging + code style)
![[deep anatomy of C language] keywords if & else & bool type](/img/cf/a0533b7d3a597368aefe6ce7fd6dbb.png)
[deep anatomy of C language] keywords if & else & bool type

Windows8.1 64 installed by mysql5.7.27
随机推荐
User exit and customer exit in SAP ABAP -015
scala-for的基本应用
变量
uniapp微信小程序获取页面二维码(带有参数)
为什么要买增额终身寿险?增额终身寿险安全可靠吗?
SAP ABAP dialog programming tutorial: module pool in -09
迭代器与生成器
购买指南丨如何购买一台高质量会议平板,这几个方面一定要对比
What is the difference between "img" and "ALT" in the interview question
图计算Hama-BSP模型的运行流程
【微信小程序封装底部弹出框】一
mysql 字符串字段转浮点型字段
In the era of video explosion, who is supporting the high-speed operation of video ecological network?
What should I do if I can't hear a sound during a video conference?
How to open an account in flush? Is it safe to open an account online?
系统吞吐量、TPS(QPS)、用户并发量、性能测试概念和公式
Test for API
Test for API
In case of default import failure
spark的NaiveBayes中文文本分类