当前位置:网站首页>Spark parameter tuning practice
Spark parameter tuning practice
2022-06-24 07:00:00 【Angryshark_ one hundred and twenty-eight】
Parameter setting method
Spark The task is submitted , You can set parameters in the following ways :
environment variable
Through the configuration file spark-env.sh add to , Such as
export JAVA_HOME=/usr/local/jdk
export SCALA_HOME=/usr/local/scala
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_WEBUI_PORT=8088
export SPARK_WORKER_WEBUI_PORT=8099
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=8g
This parameter setting is global , Not suitable for all tasks , So it can be used as the default , Specific tasks encountered , Then set it separately .
Project code
stay SaprkConf Set in , Such as
val conf=new SparkConf().setMaster("local").setAppName("Myapplication").set("spark.executor.memory","1g")
val sc=new SparkContext(conf)
Or by System.properties Set up , Such as
System.setProperty("spark.executor.memory","14g")
System.setProperty("spark.worker.memory","16g")
val conf=new SparkConf().setAppName("Simple Application")
val sc=new SparkContext(conf)
Submit script
Set when the task is submitted , Such as
spark-submit \
--spark.executor.memory 14g \
--spark.worker.memory 16g \
--name SimpleTest \
-jars ./lib/*.jar \
--classpath com.example.test \
sparkTest.jar
If there is no clear requirement , Personal suggestions can be set in the task submission script , More flexible .
| Parameter name | Format | Parameter description |
|---|---|---|
| –master | MASTER_URL | Such as spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local |
| –deploy-mode | DEPLOY_MODE | Client perhaps master, The default is client |
| –class | CLASS_NAME | The main class of the application |
| –name | NAME | Name of the application |
| –jars | JARS | Comma separated local jar package , Included in driver and executor Of classpath Next |
| –packages | Included in driver and executor Of classpath Under the jar Packages are comma separated ”groupId:artifactId:version” list | |
| –exclude-packages | Comma separated ”groupId:artifactId” list | |
| –repositories | Comma separated remote warehouses | |
| –py-files | PY_FILES | Comma separated ”.zip”,”.egg” perhaps “.py” file , These documents are in python app Of PYTHONPATH below |
| –files | FILES | Comma separated files , These files are placed in each executor Under the working directory of |
| –conf | PROP=VALUE | fixed spark Configuration properties , The default is conf/spark-defaults.conf |
| –properties-file | FILE | Loading files with additional properties |
| –driver-memory | MEM | Driver Memory , Default 1G |
| –driver-java-options | Pass to driver Extra Java Options | |
| –driver-library-path | Pass to driver Additional Library paths for | |
| –driver-class-path | Pass to driver Extra Classpaths for | |
| –executor-memory | MEM | Every executor Of memory , The default is 1G |
| –proxy-user | NAME | Impersonate the user who submitted the application |
| –driver-cores | NUM | Driver The number of nuclear , The default is 1. This parameter is only used in standalone colony deploy Use... In mode |
| –supervise | Driver When the failure , restart driver. stay mesos perhaps standalone Next use | |
| –verbose | Print debug Information | |
| –total-executor-cores | NUM | all executor The total number of kernels . Only in the mesos perhaps standalone Next use |
| –executor-core | NUM | Every executor The number of nuclear . stay yarn perhaps standalone Next use |
| –driver-cores | NUM | Driver The number of nuclear , The default is 1. stay yarn Use in cluster mode |
| –queue | QUEUE_NAME | Queue name . stay yarn Next use |
| –num-executors | NUM | Starting up executor Number . The default is 2. stay yarn Next use |
Performance viewing mode
Spark Tasks can be viewed in the following ways :
1)Web UI
2)Driver Program console log
3)logs Log under folder
4)work Log under folder
5)Profiler Tools , for example , some JVM Of Profiler Tools , Such as Yourkit、Jconsole perhaps JMap、JStack Wait for the order . More comprehensive can be achieved through the cluster Profiler Tools , Such as Ganglia、Ambaria etc. .
Task optimization
Spark Task optimization should be carried out from the following aspects :
Merge small partitions
Application
Apply to filter After the operation , Filtering out large amounts of data .
The user to use Spark In the process of , Often use filter Operator for data filtering . And frequent filtering or too much filtered data will cause problems , Cause a large number of small partitions ( Each partition has a small amount of data ). because Spark Each data partition is assigned a task to execute , If there are too many tasks , The amount of data processed by each task is very small , It will cause high thread switching cost , Many tasks are waiting to be performed , The problem of low parallelism , It is very uneconomical .
Solution
Use repartion/coalesce Redo the partition merge
val rdd=sc.textFile("1.log").filter(line=>lines.contains("error")).filter(line=>line.contains(info)).repartition(10).collect();
Increase parallelism
Application
Read files in parallel , Conduct Map/Filter/Reduce Wait for the operation .
Solution
Add parallelism parameter when calling function , Such as :reduceByKey(func,numPartitions)
To configure spark.default.parallelism Parameters
The official recommendation is that each CPU Distribute 2~3 A mission , But in actual production, the number of parallelism needs to be weighed ;
If parallelism is too high , Too many tasks , There will be a lot of task startup and switching overhead ;
If parallelism is too low , The number of tasks is too small , The parallel computing power of the cluster will not be exerted , Task execution is too slow , At the same time, it may cause memory combine Too much data takes up memory , And there's a memory overflow (out ofmemory) It's abnormal .
DAG Optimize
Solution
The same Stage Contain as many operators as possible , In order to reduce shuffle operation , Reduce the overhead of task startup and switching
Reuse cached data , Use cache and persist Cache data
Memory optimization
Solution
JVM tuning
Minimize the use of objects with small amounts of data .
Different Java Every object has an object header (object header), The object header is approximately 16byte, Contains information such as a pointer to the class of this object , For some objects with only a small amount of data , This is extremely uneconomical . for example , only one Int Object of property , The information in this header will occupy more space than the data space of the object .
Java String in (String) Occupy 40byte Space .String The memory is to store the information of the real string in a char Array , It also stores other information , Such as string length , At the same time, if UTF-16 code , One character takes up 2byte Space . Above all , One 10 Character strings can take up more than 60byte Of memory space .
Evaluate memory footprint
The best way to calculate the amount of space the data occupies in the cluster memory is to create a RDD, Reading this data , Load data into cache, View in the driver console SparkContext Log . This log information shows how much space each partition occupies ( When there is not enough memory , Write data to disk ), Then the user can roughly estimate the total number of partitions RDD Occupied space . for example , The following log information .
INFO BlockManagerMasterActor:Added rdd_0-1 in memory on mbk.local:50311(size:717.5 KB,free:332.3 MB)
This means RDD0 Of partition1 consumed 717.5KB Memory space .
Adjust the data structure
Reduce some other than the original data Java Consumption of unique information , Such as pointer consumption in chain structure 、 Consumption of metadata generated by packaging data .
When designing and selecting data structures, it is best to use array types and basic data types , Try to reduce some chain Java Set or Scala Use of set types .
Reduce object nesting . for example , Using a large amount of data is small 、 Multiple objects and collection data structures with pointers , This incurs a large amount of pointer and object header metadata overhead .
Consider using numerical ID Or enumerate objects , Instead of using strings as key The data type of the key , The metadata of the string and its character encoding take up too much space .
Serialized storage RDD
In the program, you can set StorageLevels This enumeration type is used to configure RDD Data storage mode of . If the user wants to cache data in memory , Is officially recommended Kyro Serialization library , because Kyro Compared with Java The serialized object takes up less space , Better performance .
GC tuning
When JVM When you need to replace and reclaim the space occupied by old objects to provide storage space for new objects , according to JVM Garbage collection algorithm ,JVM Will traverse all Java object , Then find objects that are no longer in use and recycle them .
When designing data structures, you should try to use data structures that create fewer objects , For example, try to use arrays Array, And use less linked lists LinkedList, Thus reducing garbage collection overhead .
Yes GC Come on , An important configuration parameter is memory allocation RDD The amount of space used for caching .
By default ,Spark Use the configured Executor 60% Of memory (spark.executor.memory) cache RDD. That means 40% The remaining memory space can make Task Cache newly created objects during execution .
In some cases , Users' tasks slow down , and JVM Frequent garbage collection or memory overflow (out of memory abnormal ), In this case, you can adjust the percentage parameter to 50%. This percentage parameter can be configured spark-env.sh The variables in the spark.storage.memoryFraction=0.5 To configure .
At the same time, the serialized cache storage objects are combined to reduce the memory space , It will more effectively alleviate the problem of garbage collection .
OOM Optimize
To avoid memory overflow , It can be optimized from the following aspects :
Inspection procedure , See if there are dead loops or places where you don't have to create a lot of objects repeatedly ;
increase Java In the virtual machine Xms( Initial heap size ) and Xmx( Maximum heap size ) The size of the parameter , Such as setJAVA_OPTS=-Xms256m- Xmx1024m;
Optimize data structure and related configuration parameters
Temporary directory space optimization
Configuration parameters spark.local.dir Can be configured Spark In the temporary directory of the disk , The default is /tmp Catalog . stay Spark Conduct Shuffle In the process of , Intermediate results are written to Spark In the temporary directory of the disk , Or when the memory cannot be fully stored RDD when , Data that cannot be stored in memory will be written to the configured disk temporary directory .
Setting this temporary directory too small will cause No space left ondevice abnormal . You can also configure multiple disk blocks spark.local.dir=/mn1/spark,/mnt2/spar,/mnt3/spark To expand Spark Disk temporary directory of , So that more data can be written to disk , To speed up the I/O Speed .
Network transmission optimization
Large task distribution
The metadata information of the task will be serialized during task distribution , And what the task requires jar And documents .
Tasks are distributed through AKKA In the library Actor Messaging between models . because Spark Adopted Scala Functional style of , The variable reference of the transfer function is passed in the form of closure , So when the data to be transmitted passes through Task When it comes to distribution , It will slow down the overall execution speed .
Configuration parameters spark.akka.frameSize( Default buffer The size is 10MB) It can mitigate the damage caused by too large tasks AKKA Buffer overflow problem , But this approach does not solve the essential problem .
Use broadcast variables
Broadcast Mainly used for sharing Spark In the calculation process, each task Read only variables that will be used ,Broadcast Only one copy of the variable will be saved on each computing machine , Not every task Pass a copy , This saves a lot of space , Saving space means reducing transmission time , It's also efficient .
collect The result is too large to optimize
adopt SparkContext Turn each partition execution into an array , After returning to the primary node , Combine all partitioned arrays into one array . At this time , If carried out Collect The data is too large , There will be problems , A large number of slave nodes write data back to the same node , Slow down the overall running time , Or it may cause memory overflow problems .
Solution
When the final result data collected is too large , Data can be stored in a distributed HDFS Or other distributed persistence layer . Distribute data storage , It can reduce the single machine data I/O Overhead and stand-alone memory storage pressure .
Or when the data is not too large , But it will exceed AKKA Transmission of Buffer Big hour , Need to increase the AKKA Actor Of buffer, Parameters can be configured by spark.akka.frameSize( The default size is 10MB) Adjustment .
Serialization optimization
Spark Two serialization libraries and two sequences are provided in It's a way of : Use Java The way and use of standard serialization library for serialization Kyro How libraries are serialized .
Java The standard serialization library has good compatibility , But it's big 、 Slow speed ,Kyro Library compatibility is slightly poor , But it's small 、 Fast . So when you can use Kyro Under the circumstances , Still recommended Kyro serialize .
Can pass spark.serializer="org.apache.spark.serializer.KryoSerializer" To configure whether to use Kyro serialize , This configuration parameter determines Shuffle Carry out network transmission and when the memory cannot hold RDD When writing partition to disk , The type of serializer used .
If the object takes up a lot of space , Need to increase the Kryo Buffer capacity of , You need to add configuration items spark.kryoserializer.buffer.mb The numerical , The default is 2MB, But the parameter value should be large enough , To accommodate the largest transfer of serialized objects .
Compression optimization
stay Spark Chinese vs RDD perhaps Broadcast Data compression , It is a means to improve data throughput and performance . compressed data , Can greatly reduce the disk storage space , At the same time, the compressed files are transferred between disks and I/O And the communication overhead of network transmission will also be reduced ; Of course, compression and decompression also bring additional CPU expenses , But you can save more I/O And use less memory overhead .
compressed data , It can minimize the disk space and network required for files I/O The cost of , But compressing and decompressing data always increases CPU The cost of , So it's best for those I/O Intensive jobs use data compression —— There will be a surplus for such homework CPU resources , Or for those systems that don't have enough disk space .
Spark At present, we support LZF and Snappy Two decompression methods .Snappy Provides higher compression speed ,LZF Provides a higher compression ratio , The user can select the compression method according to the specific requirements .
[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-3uhT7DHL-1609390347651)(695FD3180A7D470FA6D0599F0D472776)]
Batch optimization
Application
Call external resources , Such as database connection , You can convert a single record write to a batch write of the database , The data of each partition is written once , In this way, the batch write optimization of the database can be used to reduce the overhead and reduce the pressure on the database .
rdd.foreach(line=>conn=getConnection()
conn.write(line)
conn.close)
After optimization
rdd.mapPartition(line=>conn=getConnection()
for(item<-items)
conn.write(item)
conn.close
)
Use as much as possible reduceByKey Don't use reduce
reduce yes action operation , The results of each task need to be collected on one node , and reduceByKey Is distributed , So there is no reduce Bottleneck .
Data skew optimization
This is also the focus of optimization , Summarize in a separate chapter .
边栏推荐
- Le système de surveillance du nuage hertzbeat v1.1.0 a été publié, une commande pour démarrer le voyage de surveillance!
- On BOM and DOM (4): dom0/dom2 event handling analysis
- leetcode:85. 最大矩形
- 目标5000万日活,Pwnk欲打造下一代年轻人的“迪士尼乐园”
- typescript vscode /bin/sh: ts-node: command not found
- Do you know about Statistics?
- Replacing human eyes -- visual inspection technology
- How to operate the little red book account: making good use of the theory of long tail words
- 为什么要用lock 【readonly】object?为什么不要lock(this)?
- Talk about how to dynamically specify feign call service name according to the environment
猜你喜欢

智能视觉组A4纸识别样例

Database stored procedure begin end

Nine unique skills of Huawei cloud low latency Technology

With a goal of 50million days' living, pwnk wants to build a "Disneyland" for the next generation of young people

About Stacked Generalization

数据库 存储过程 begin end

Open source and innovation

.NET7之MiniAPI(特别篇) :Preview5优化了JWT验证(上)

网吧管理系统与数据库

数据同步工具 DataX 已经正式支持读写 TDengine
随机推荐
Nine unique skills of Huawei cloud low latency Technology
What is the role of domain name websites? How to query domain name websites
RS485 serial port wiring description of smart lamp post smart gateway
Open source and innovation
What is the main function of cloud disk? How to restore deleted pictures
typescript vscode /bin/sh: ts-node: command not found
机器人迷雾之算力与智能
缓存操作rockscache原理图
RealNetworks vs. 微软:早期流媒体行业之争
Overview of cloud computing advantages of using cloud computing
puzzle(019.1)Hook、Gear
File system notes
数据同步工具 DataX 已经正式支持读写 TDengine
【二叉数学习】—— 树的介绍
【二叉树】——二叉树中序遍历
程序员使用个性壁纸
基于三维GIS系统的智慧水库管理应用
如何低成本构建一个APP
Asp+access web server reports an error CONN.ASP error 80004005
智能视觉组A4纸识别样例