当前位置:网站首页>Spark Learning: a form of association in a distributed environment?
Spark Learning: a form of association in a distributed environment?
2022-07-24 09:35:00 【I love evening primrose a】
In the application scenario of big data , Data processing is often carried out in a distributed environment , under these circumstances , The calculation of data association often needs to consider the link of network distribution
In a distributed environment ,Spark Two types of data distribution modes are supported , One is shuffle,shuffle Complete through intermediate files Map The phase and Reduce Phase data exchange ; The other is broadcast variables , Variables are broadcasting Driver Create , And by the Driver Distribute to each Excutors, Therefore, from the perspective of data distribution , Data association can be divided into Shuffle Join and Broadcase Join
eg:
import org.apache.spark.sql.DataFrame
import spark.implicits._
val seq = Seq((1,"li",22,"Male"),(2,"shi",24,"Female"),(3,"ming",26,"Female"))
val employees:DataFrame = seq.toDF("id","name","age","gender")
val seq2 = Seq((1,20000),(2,30000),(3,40000),(4,50000))
val salaries:DataFrame = seq2.toDF("id","salary")
One 、Shuffle Join
- Take the above employee table and salary table as an example , If the employees and salaries according to id Columns are related , So for id Employee data and salary data with the same field , We need to make sure they are located in the same Executors In progress ,SparkSql To use HJ、SMJ、NLJ, With Excutos( process ) Data association is done in parallel for granularity
- Without developer intervention ,Spark By default Shuffle Join To complete the data association in the distributed environment
- Spark Sql The reason is that Shuffle Join, The reason lies in Shuffle
Join The all gold oil attribute , In any case , No matter the size of the data 、 Whether there is enough memory or not ,Shuffle Join Can complete the correlation calculation of data
Two 、Broadcase Join
Spark You can not only create broadcast variables on ordinary variables , In distributed datasets (RDD、DataFrame) You can also create broadcast variables on , therefore , For the two tables involved in association , We can encapsulate smaller ones into broadcast variables
import org.apache.spark.sql.functions.broadcast
// Create broadcast variables for the employee table
val bcEmployees = broadcast(employees)
val joinDF:DataFrame = salaries.join(bcEmployees,salaries("id") === employees("id"),"inner")
stay Broadcast Join During the execution of ,Spark
Sql First, from each Executors collect employees Table data segmentation , And then in Driver Create broadcast variables at the end bcEmployeescarrier employees Broadcast variables of table data fragmentation are distributed to each executors, As long as the salary table with large volume remains unchanged , You can easily associate with the employee table data consistent with it
Although the creation and distribution of broadcast variables also consume network bandwidth , But compared with Shuffle
Join The whole network distribution of the two tables in , Data association is accomplished only by distributing small amounts of data ,Spark Sql The execution performance of is obviously much higher
3、 ... and 、Spark Sql Supported by Join Strategy
Whether it's Shuffle Join, still Broadcast Join, Once the data is distributed , Theoretically, it can be used HJ、NLJ、SMJ Any of these three implementation mechanisms , therefore , Two distribution modes and three implementation mechanisms , combined , All in all 6 Species distributed Join Strategy 
As shown in the figure ,Spark Sql Five of them are supported ( Except the scarlet letter )
| The associated condition | Join Strategy sorting |
|---|---|
| Equivalent correlation | Broadcast HJ > Shuffle SMJ > Shuffle HJ |
| Unequal value association | Broadcast NLJ > Shuffle NLJ |
HJ The efficiency of implementation is no less than SMJ, Why take priority in equivalent Correlation Shuffle SMJ?
- stay Shuffle In the implementation mechanism of ,map The stage will sort the data , And that fits SMJ Mechanism , For the two sorted tables ,SMJ The complexity of is O(M+N), And HJ Of O(M+N) Be roughly the same ; also SMJ Is far more stable than HJ, With limited memory ,SMJ You can make full use of the disk to complete the associated calculation . therefore , Give priority to Shuffle
Join
边栏推荐
- Let's test 5million pieces of data. How to use index acceleration reasonably?
- [Luogu p5829] [template] mismatch tree (string) (KMP)
- Definition and initialization of cv:: mat
- PHP Basics - PHP magic method
- It is reported that the prices of some Intel FPGA chip products have increased by up to 20%
- We were tossed all night by a Kong performance bug
- Vscode failed to use SSH Remote Connection (and a collection of other problems)
- ASI-20220222-Implicit PendingIntent
- Linked list - 24. Exchange nodes in the linked list in pairs
- Makefile变量及动态库静态库
猜你喜欢

Cloud primordial (12) | introduction to kubernetes foundation of kubernetes chapter

Racecar multi-point navigation experiment based on ROS communication mechanism

Boundless dialogue | participate in the live broadcast on July 25 and win the prize

Little dolphin "transformed" into a new intelligent scheduling engine, which can be explained in simple terms in the practical development and application of DDS

Will your NFT disappear? Dfinity provides the best solution for NFT storage

【笔记】什么是内核/用户空间 从CPU如何运行程序讲起
![[MySQL] - deep understanding of index](/img/a6/6ca1356fe11bd33ec7362ce7cdc652.png)
[MySQL] - deep understanding of index
![[the first anniversary of my creation] love needs to be commemorated, so does creation](/img/89/2f8eec4f0a0bcf77d5a91179012899.png)
[the first anniversary of my creation] love needs to be commemorated, so does creation

Tiflash source code reading (V) deltatree storage engine design and implementation analysis - Part 2

Account 1-2
随机推荐
Definition and initialization of cv:: mat
[don't bother to strengthen learning] video notes (II) 1. What is Q-learning?
[don't bother to strengthen learning] video notes (III) 2. SARS learning realizes maze walking
Asyncdata cross domain error after nuxt route switching
One click openstack single point mode environment deployment - preliminary construction
Hands on deep learning (VII) -- bounding box and anchor box
Getting started with web security - open source firewall pfsense installation configuration
Replace the function of pow with two-dimensional array (solve the time overrun caused by POW)
S2b2b system standardizes the ordering and purchasing process and upgrades the supply chain system of household building materials industry
Detailed sequence traversal of leetcode102 binary tree
What if path is deleted by mistake when configuring system environment variables?
Problems and abuse of protocol buffers
TiFlash 源码阅读(五) DeltaTree 存储引擎设计及实现分析 - Part 2
[don't bother with intensive learning] video notes (III) 1. What is SARS?
Android Version Description security privacy 13
Protocol buffers 的问题和滥用
RxJS Beginner Guide
What is the component customization event we are talking about?
科目1-2
A null pointer exception is reported when the wrapper class inserts into the empty field of the database table