当前位置:网站首页>Analysis of the read data source code of spark shuffle
Analysis of the read data source code of spark shuffle
2022-06-22 16:46:00 【ZH519080】
An abstract interface for computing a specified partition , think CoGroupedRDD( perhaps ShuffleRDD, Probably compute The details are different , however shuffle It is the same when calling the read class or method ) Of compute Method is implementation , Source code :
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
val dependencyPartition = split.narrowDeps(depNum).get.split
// Read the parent RDD The data of
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
rddIterators += ((it, depNum))
case shuffleDependency: ShuffleDependency[_, _, _] =>
// First of all, from the SparkEnv obtain ShuffleManager, And then from ShuffleDependency Get registered to ShuffleManager When you get shuffleHandle, according to shuffleHandle And the current Task Corresponding partition ID obtain ShuffleWriter, According to the ShuffleReader call read Interface , Read Shuffle Of Map Output
val it = SparkEnv.get.shuffleManager
.getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
.read()
rddIterators += ((it, depNum))
}
val map = createExternalMap(numRdds)
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
context.internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}We can see from the source code , Bandwidth dependent RDD Of compute operation , Finally through SparkEnv Of ShuffleManager Example of getReader Method to get the data reader , Then call the reader's read Method to read from the specified partition range Shuffle data .
Trait ShuffleReader Is a subclass BlockStoreShuffleReader Realization , among BlockStoreShuffleReader Of read Method source code :
/** For the sake of Reduce Tasks read and merge key-values value */
override def read(): Iterator[Product2[K, C]] = {
val blockFetcherItr = new ShuffleBlockFetcherIterator(context, blockManager.shuffleClient, blockManager,
/** When ShuffleMapTask Register to mapOutputTracker Metadata information , Will pass mapOutputTracker obtain , At the same time, specify the returned partition */
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
/** The default is 48M, Parallel read strategy : Avoid using too much bandwidth on the target machine , You can also start the parallel mechanism to speed up the reading */
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
// The unique identification of each data block obtained above ID Information and its corresponding input stream
val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
blockManager.wrapForCompression(blockId, inputStream) // lz4、lzf、snappy Three types of compressors
}
val ser = Serializer.getSerializer(dep.serializer)
val serializerInstance = ser.newInstance()
// For each stream Create a key-values iterator
val recordIter = wrappedStreams.flatMap { wrappedStream =>
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}
// Update context workload
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map(record => {
readMetrics.incRecordsRead(1)
record
}),
context.taskMetrics().updateShuffleReadMetrics())
// In order to support task cancellation , Must use interruptible iterators
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
// The read data is aggregated
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) { // The data obtained is in Map End to end
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
//Map Each partition at the end is for key The merged results are aggregated again ,Map The combination of can greatly reduce the network transmission
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else { // Only need Reduce End polymerization
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else { // No aggregation is required
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}
// Based on Sort Of Shuffle In the process of implementation , Default is based on PartitionId Sort , The internal data of the partition is not sorted , So added keyOrdering Variable , Provide identification information about whether to sort the data in the partition , If sorting is defined , Sort the output results
dep.keyOrdering match { // Determine whether to sort
case Some(keyOrd: Ordering[K]) =>
// To reduce memory pressure , avoid GC expenses , An external sorter is introduced to sort data . When there is not enough memory to hold the sorted data , According to the configuration spark.shuffle.spill Property to determine whether or not spill To disk , The default is to turn on spill On off .
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter // If you do not need to sort partitions, you can directly return
}
}stay BlockStoreShuffleReader Of read Method call ShuffleBlockFetcherIterator Constructors , Realization ShuffleBlockFetcherIterator.initialize Method , stay initialize Methods are implemented successively splitLocalRemoteBlocks、fetchUpToMaxBytes and fetchLocalBlocks Other methods , First of all, let's analyze ShuffleBlockFetcherIterator Of splitLocalRemoteBlocks Method source code :
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
// Start in parallel at most each time 5 Threads from 5 Read data on nodes , Capacity per request <= spark.reducer.maxMbInFlight( The default is 48M)/5
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
val remoteRequests = new ArrayBuffer[FetchRequest]
var totalBlocks = 0
for ((address, blockInfos) <- blocksByAddress) {
totalBlocks += blockInfos.size
if (address.executorId == blockManager.blockManagerId.executorId) { // Get local data blocks
localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) // Filter if the data block is empty , When data and BlockManager At the same node , Then directly put Blocks Deposit in localBlocks in
numBlocksToFetch += localBlocks.size
} else { // The data is not local
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(BlockId, Long)]
while (iterator.hasNext) { //BlockId The format of :shuffle_+shuffleId_+mapId_+reduceId
val (blockId, size) = iterator.next()
if (size > 0) { // Filter empty data blocks
curBlocks += ((blockId, size))
remoteBlocks += blockId // Record data blocks on the remote machine Id(BlockId)
numBlocksToFetch += 1
curRequestSize += size
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
if (curRequestSize >= targetRequestSize) {
remoteRequests += new FetchRequest(address, curBlocks)
curBlocks = new ArrayBuffer[(BlockId, Long)]
curRequestSize = 0
}
} // When data is not local , Generate remoteRequests, The conditions are :curReuestSize Equal to maxBytesInFlight/5, Will be able to block Information in remoteRequests in , Include block Location ,blockId,block Size information
if (curBlocks.nonEmpty) {
remoteRequests += new FetchRequest(address, curBlocks)
}
} // Be careful :FetchRequest There may be a memory leak , If single Block Too big ,fetch It takes up too much memory OOM
}
remoteRequests
}ShuffleBlockFetcherIterator Of fetchUpToMaxBytes The method is to send a request to get remote data , Only the sum of the current data volume and the requested data volume is less than maxBytesInFlight To send a request :
private def fetchUpToMaxBytes(): Unit = {
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
}By implementing fetchUpToMaxBytes Method after getting the remote data , By the way fetchLocalBlocks Method to get local data ,ShuffleBlockFetcherIterator Of fetchLocalBlocks Source code :
private[this] def fetchLocalBlocks() {
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.incLocalBlocksFetched(1)
shuffleMetrics.incLocalBytesRead(buf.size)
buf.retain()
results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf))
} catch {
case e: Exception =>
results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
return
}
}
}fetcheLocalBlocks Method to get the local data block is actually calling BlockManager Of getBlockData Method ,BlockManager Of getBlockData Method is actually called IndexShuffleBlockResolver or FileShuffleBlockResolver( Two types of inherited traits ShuffleBlockResolver) Of getBlockData:
ShuffleBlockFetcherIterator.fetchLocalBlocks -> BlockManager.getBlockData -> ShuffleBlockResolver.getBlockData technological process .
IndexShuffleBlockResolver Of getBlockData Realized DiskBlockManager Of getFile Method ;FileShuffleBlockResolver Of getBlockData What we achieve is FileSegmentManagedBuffer Constructors .
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
// Start in parallel at most each time 5 Threads from 5 Read data on nodes , Capacity per request <= spark.reducer.maxMbInFlight( The default is 48M)/5
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
val remotRequests = new ArrayBuffer[FetchRequest]
var totalBlocks = 0
for ((address, blockInfos) <- blocksByAddress) {
totalBlocks += blockInfos.size
if (address.executorId == blockManager.blockManagerId.executorId) { // Get local data blocks
localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) // Filter if the data block is empty , When data and BlockManager At the same node , Then directly put Blocks Deposit in localBlocks in
numBlocksToFetch += localBlocks.size
} else { // The data is not local
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(BlockId, Long)]
while (iterator.hasNext) { //BlockId The format of :shuffle_+shuffleId_+mapId_+reduceId
val (blockId, size) = iterator.next()
if (size > 0) { // Filter empty data blocks
curBlocks += ((blockId, size))
remoteBlocks += blockId // Record data blocks on the remote machine Id(BlockId)
numBlocksToFetch += 1
curRequestSize += size
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
if (curRequestSize >= targetRequestSize) {
remoteRequests += new FetchRequest(address, curBlocks)
curBlocks = new ArrayBuffer[(BlockId, Long)]
curRequestSize = 0
}
} // When data is not local , Generate remoteRequests, The conditions are :curReuestSize Equal to maxBytesInFlight/5, Will be able to block Information in remoteRequests in , Include block Location ,blockId,block Size information
if (curBlocks.nonEmpty) {
remoteRequests += new FetchRequest(address, curBlocks)
}
} // Be careful :FetchRequest There may be a memory leak , If single Block Too big ,fetch It takes up too much memory OOM
}
remoteRequests
}
边栏推荐
猜你喜欢

Special research on Intelligent upgrading of heavy trucks in China in 2022

ABAP query tutorial in sap: sq01, sq02, sq03-017

高可用性的ResourceManager

mysql5.7.27安装之windows8.1 64

CUMT study diary - quick notes of digital image processing examination

SAP script tutorial: se71, se78, SCC1, vf03, so10-013
![[C language] deeply analyze the storage of integer and floating-point types in memory](/img/8b/12a4dc7a0c0e34e2add007592971dd.jpg)
[C language] deeply analyze the storage of integer and floating-point types in memory

5 modes of IO model

机器学习笔记 - HaGRID—手势识别图像数据集简介

IDEA安装总结
随机推荐
交互电子白板有哪些特点?电子白板功能介绍
接口幂等性设计
jsp学习之(二)---------jsp脚本元素和指令
SAP ABAP sub screen tutorial: call sub screen -010 in SAP
【小程序项目开发-- 京东商城】uni-app开发之分包配置
同花顺怎么开户?网上开户安全么?
Interface (optimization type annotation)
CUMT study diary - quick notes of digital image processing examination
Modularity in SAP ABAP: macros, subroutines and function modules -04
spark关于数据倾斜问题
Lecture 6 of slam Lecture 14 -- nonlinear optimization
nio使用可写事件处理一次性写不完情况
NiO programming service
如何为政企移动办公加上一道“安全锁”?
spark常用 算子小总结
接口(优化类型注解)
什么是RESTful,REST api设计时应该遵守什么样的规则?
How to add a "security lock" to the mobile office of government and enterprises?
How to open an account in flush? Is it safe to open an account online?
How to use IDM to accelerate Baidu cloud