当前位置:网站首页>Spark Learning: a form of association in a distributed environment?

Spark Learning: a form of association in a distributed environment?

2022-07-24 09:35:00 I love evening primrose a

In the application scenario of big data , Data processing is often carried out in a distributed environment , under these circumstances , The calculation of data association often needs to consider the link of network distribution
In a distributed environment ,Spark Two types of data distribution modes are supported , One is shuffle,shuffle Complete through intermediate files Map The phase and Reduce Phase data exchange ; The other is broadcast variables , Variables are broadcasting Driver Create , And by the Driver Distribute to each Excutors, Therefore, from the perspective of data distribution , Data association can be divided into Shuffle Join and Broadcase Join
eg:

import org.apache.spark.sql.DataFrame
import spark.implicits._
val seq = Seq((1,"li",22,"Male"),(2,"shi",24,"Female"),(3,"ming",26,"Female"))
val employees:DataFrame = seq.toDF("id","name","age","gender")
val seq2 = Seq((1,20000),(2,30000),(3,40000),(4,50000))
val salaries:DataFrame = seq2.toDF("id","salary")

One 、Shuffle Join

  • Take the above employee table and salary table as an example , If the employees and salaries according to id Columns are related , So for id Employee data and salary data with the same field , We need to make sure they are located in the same Executors In progress ,SparkSql To use HJ、SMJ、NLJ, With Excutos( process ) Data association is done in parallel for granularity
  • Without developer intervention ,Spark By default Shuffle Join To complete the data association in the distributed environment
  • Spark Sql The reason is that Shuffle Join, The reason lies in Shuffle
    Join The all gold oil attribute , In any case , No matter the size of the data 、 Whether there is enough memory or not ,Shuffle Join Can complete the correlation calculation of data

Two 、Broadcase Join
Spark You can not only create broadcast variables on ordinary variables , In distributed datasets (RDD、DataFrame) You can also create broadcast variables on , therefore , For the two tables involved in association , We can encapsulate smaller ones into broadcast variables

import org.apache.spark.sql.functions.broadcast
// Create broadcast variables for the employee table 
val bcEmployees = broadcast(employees)
val joinDF:DataFrame = salaries.join(bcEmployees,salaries("id") === employees("id"),"inner")
  • stay Broadcast Join During the execution of ,Spark
    Sql First, from each Executors collect employees Table data segmentation , And then in Driver Create broadcast variables at the end bcEmployees

  • carrier employees Broadcast variables of table data fragmentation are distributed to each executors, As long as the salary table with large volume remains unchanged , You can easily associate with the employee table data consistent with it

  • Although the creation and distribution of broadcast variables also consume network bandwidth , But compared with Shuffle
    Join The whole network distribution of the two tables in , Data association is accomplished only by distributing small amounts of data ,Spark Sql The execution performance of is obviously much higher

3、 ... and 、Spark Sql Supported by Join Strategy
Whether it's Shuffle Join, still Broadcast Join, Once the data is distributed , Theoretically, it can be used HJ、NLJ、SMJ Any of these three implementation mechanisms , therefore , Two distribution modes and three implementation mechanisms , combined , All in all 6 Species distributed Join Strategy
 Insert picture description here
As shown in the figure ,Spark Sql Five of them are supported ( Except the scarlet letter )

The associated condition Join Strategy sorting
Equivalent correlation Broadcast HJ > Shuffle SMJ > Shuffle HJ
Unequal value association Broadcast NLJ > Shuffle NLJ

HJ The efficiency of implementation is no less than SMJ, Why take priority in equivalent Correlation Shuffle SMJ?

  • stay Shuffle In the implementation mechanism of ,map The stage will sort the data , And that fits SMJ Mechanism , For the two sorted tables ,SMJ The complexity of is O(M+N), And HJ Of O(M+N) Be roughly the same ; also SMJ Is far more stable than HJ, With limited memory ,SMJ You can make full use of the disk to complete the associated calculation . therefore , Give priority to Shuffle
    Join
原网站

版权声明
本文为[I love evening primrose a]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/204/202207221524152113.html