当前位置:网站首页>Spark partition operators partitionby, coalesce, repartition
Spark partition operators partitionby, coalesce, repartition
2022-07-25 15:16:00 【The south wind knows what I mean】
List of articles
cause
I suddenly think about research these days df.show() Function algorithm , Then I saw an article about this , We watch , I have questions about the code in the article ?
1. Question code :
val df = Seq((5,5), (6,6), (7,7), (8,8), (1,1), (2,2), (3,3), (4,4)).toDF("col1", "col2")
val df3 = df.repartition(3)
// lets see partition structures
df3.rdd.glom().collect()
/* Array(Array([8,8], [1,1], [2,2]), Array([5,5], [6,6]), Array([7,7], [3,3], [4,4])) */
// And lets see the top 4 rows this time
df3.show(4, false)
/* +----+----+ |col1|col2| +----+----+ |8 |8 | |1 |1 | |2 |2 | |5 |5 | +----+----+ The following code , Why? 8,1,2 Will be divided into a partition ?? Do not conform to the hashPartition Our algorithm , Is it a range partition ? try
df3.rdd.glom().collect()
/* Array(Array([8,8], [1,1], [2,2]), Array([5,5], [6,6]), Array([7,7], [3,3], [4,4])) */
2. Test with questions :
//todo test RDD Partition
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Seq((5, 5), (6, 6), (7, 7), (8, 8), (1, 1), (2, 2), (3, 3), (4, 4)),8)
println(rdd1.getNumPartitions) //8
- HashPartitioner
// Hash partition
val rdd2: RDD[(Int, Int)] = rdd1.partitionBy(new HashPartitioner(3))
println(rdd2.getNumPartitions)//3
rdd2.mapPartitionsWithIndex((index,iter)=>{
val str: String = iter.map(_._2).mkString(",")
Iterator((index,str))
}).toDF("index","Value").show()
+-----+-----+
|index|Value|
+-----+-----+
| 0| 6,3|
| 1|7,1,4|
| 2|5,8,2|
+-----+-----+
- RangePartitioner
// Range partitioning
val rdd3: RDD[(Int, Int)] = rdd1.partitionBy(new RangePartitioner(3, rdd1))
println(rdd3.getNumPartitions)//3
rdd3.mapPartitionsWithIndex((index,iter)=>{
val str: String = iter.map(_._2).mkString(",")
Iterator((index,str))
}).toDF("index","Value").show()
+-----+-----+
|index|Value|
+-----+-----+
| 0|1,2,3|
| 1|5,6,4|
| 2| 7,8|
+-----+-----+
- repartition
scala> val rdd100 = sc.makeRDD(Seq((5, 5), (6, 6), (7, 7), (8, 8), (1, 1), (2, 2), (3, 3), (4, 4)),8)
rdd100: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[11] at makeRDD at <console>:24
scala> val rdd101 = rdd100.repartition(3)
rdd101: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at repartition at <console>:25
rdd101.glom.collect
res3: Array[Array[(Int, Int)]] = Array(Array(), Array((5,5), (7,7), (2,2), (3,3), (4,4)), Array((6,6), (8,8), (1,1)))
3. Conclusion
partitionBy It's based on partitioner, and repartition It's a random strategy , Cannot specify partitioner
Detailed explanation of partition operator
1.partitionBy

Same as repartition difference ,partitionBy It's based on partitioner, and repartition It's a random strategy , Cannot specify partitioner
2.coalesce
Used to change the number of partitions , According to randomly generated key, Use random strategies to distribute data evenly , Only the number of partitions can be passed in , Cannot specify partitioner
val sc = new SparkContext()
val inputRDD = sc.parallelize(Array[(Int, Char)]((3, 'c'), (3, 'f'), (1, 'a'), (4, 'd'), (1, 'h'), (2, 'b'), (5, 'e'), (2, 'g')), 5)
var coalesceRDD = inputRDD.coalesce(2) // chart 3.19 No 1 Map
coalesceRDD = inputRDD.coalesce(6) // chart 3.19 No 2 Map
coalesceRDD = inputRDD.coalesce(2, true) // chart 3.19 No 3 Map
coalesceRDD = inputRDD.coalesce(6, true) // chart 3.19 No 4 Map
- Reduce the number of partitions
var coalesceRDD = inputRDD.coalesce(2) // chart 3.19 No 1 Map

Pictured 319 No 1 The picture shows ,rdd1 The number of partitions is 5, When using coalesce(2) When reduced to two partitions ,spark The adjacent partitions will be merged directly , obtain rdd2, The data dependency formed is many to one NarrowDependency. The disadvantage of this method is , When rdd1 When the amount of data in different partitions varies greatly , Direct consolidation is easy to cause data skew (rdd2 There is too much or too little data in some partitions )
- Increase the number of partitions
coalesceRDD = inputRDD.coalesce(6)

Pictured 3.19 No 2 The picture shows , When using coalesce(6) take rdd1 The number of partitions of is increased to 6 when , You'll find that generated rdd2 The number of partitions has not increased , still 5. This is because coalesce() By default NarrowDependency, You cannot split a partition into multiple copies .
- Use Shuffle To reduce the number of partitions
coalesceRDD = inputRDD.coalesce(2, true)

Pictured 3.19 No 3 The picture shows , In order to solve the problem of data skew , We can use coalesce(2, Shuffle = true) To reduce RDD The number of partitions . Use Shuffle = true after ,Spark Random Scramble the data , So that the generated RDD The data in each partition in is relatively balanced . The specific method is rdd1 Each of the record Add a special Key, As the first 3 In the figure MapPartitionsRDD,Key yes Int type , And from [0, numPartitions) Randomly generated in , Such as <3,f > => <2,(3,f)> in ,2 It's randomly generated Key, Next record Of Key Increasing 1, Such as <1,a> => <3,(1,a)>. such ,Spark According to Key Of Hash Value will rdd1 Data in is distributed to rdd2 In different partitions of , then Get rid of Key that will do ( See the last MapPartitionsRDD).
- Use Shuffle To increase the number of partitions
coalesceRDD = inputRDD.coalesce(6, true)

Pictured 3.19 No 4 The picture shows , By using ShuffeDepedency, Partitions can be split and sufficient , Solve the problem that the number of partitions cannot be increased .
3.repartition
repartition = coalesce(numPartitions,true)
epartition(partitionNums): Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
chinese : Randomly scatter data by creating more or fewer partitions , Let the data be relatively uniform between different partitions . This operation is often done through the network .
From a design point of view ,`repartition It is used to make the data more evenly distributed `
3.1coalesce and repartition usage
Semantic differences :repartition = coalesce(numPartitions,true)
coalesce
coalesce Operators can only reduce the number of partitions by default , If set to false And the parameter is greater than the call RDD The number of partitions , Then call RDD The number of partitions will not change .oalesce The function of is often to reduce the number of partitions , It has achieved the effect of merging small files when outputting . Reducing the number of partitions has 2 In this case :
Just use the default coalesce(parNum,false), There is only one stage, And stage The parallelism of is set to coalesce Of parNum, It may affect the performance
In a stage in ,coalesce The number of partitions set in is the highest priority
coalesce(parNum,true), There will be 2 individual stage,stage0 The original parallelism is still used , Then merge partitions , Reduce parallelism . It is suitable for operations with large amount of filtered data and small amount of data .
Such as a rdd Originally there were 200 Zones , after filter After the operation , The data is greatly reduced ,200 Too many partitions , You can use coalesce(parNum,true)repartition
repartition Return a certain parNum Partitioned RDD, Definitely, I will shuffle, This is generally used to increase the number of partitions , Improve parallelism !
Reference resources :
Link to the original text :https://blog.csdn.net/qq_34224565/article/details/109508076
边栏推荐
猜你喜欢

Debounce and throttle

Pl/sql creates and executes ORALCE stored procedures and returns the result set

Boosting之GBDT源码分析

npm的nexus私服 E401 E500错误处理记录

node学习

MySQL installation and configuration super detailed tutorial and simple database and table building method

Outline and box shadow to achieve the highlight effect of contour fillet

MySQL之事务与MVCC

How much memory can a program use at most?

What is the Internet of things
随机推荐
Example of password strength verification
sql to linq 之存储过程偏
Award winning interaction | 7.19 database upgrade plan practical Summit: industry leaders gather, why do they come?
Spark AQE
Spark提交参数--files的使用
vscode 插件篇收集
Spark 判断DF为空
任务、微任务、队列和调度(动画展示每一步调用)
基于OpenCV和YOLOv3的目标检测实例应用
剑指Offer | 二进制中1的个数
Unable to start web server when Nacos starts
Spark SQL空值Null,NaN判断和处理
图片裁剪cropper 示例
Spark002---spark任务提交,传入json作为参数
bridge-nf-call-ip6tables is an unknown key异常处理
Hbck 修复问题
Application of object detection based on OpenCV and yolov3
记一次Spark报错:Failed to allocate a page (67108864 bytes), try again.
MeanShift聚类-01原理分析
用setTimeout模拟setInterval定时器