当前位置:网站首页>Spark parameter tuning practice

Spark parameter tuning practice

2022-06-24 07:00:00 Angryshark_ one hundred and twenty-eight

Parameter setting method

Spark The task is submitted , You can set parameters in the following ways :

environment variable

Through the configuration file spark-env.sh add to , Such as

export JAVA_HOME=/usr/local/jdk
export SCALA_HOME=/usr/local/scala
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_WEBUI_PORT=8088
export SPARK_WORKER_WEBUI_PORT=8099
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=8g

This parameter setting is global , Not suitable for all tasks , So it can be used as the default , Specific tasks encountered , Then set it separately .

Project code

stay SaprkConf Set in , Such as

val conf=new SparkConf().setMaster("local").setAppName("Myapplication").set("spark.executor.memory","1g")
val sc=new SparkContext(conf)

Or by System.properties Set up , Such as

System.setProperty("spark.executor.memory","14g")
System.setProperty("spark.worker.memory","16g")
val conf=new SparkConf().setAppName("Simple Application")
val sc=new SparkContext(conf)

Submit script

Set when the task is submitted , Such as

spark-submit \
--spark.executor.memory 14g \
--spark.worker.memory 16g \
--name SimpleTest \
-jars ./lib/*.jar \
--classpath com.example.test \
sparkTest.jar

If there is no clear requirement , Personal suggestions can be set in the task submission script , More flexible .

Parameter name Format Parameter description
–masterMASTER_URL Such as spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
–deploy-modeDEPLOY_MODEClient perhaps master, The default is client
–classCLASS_NAME The main class of the application
–nameNAME Name of the application
–jarsJARS Comma separated local jar package , Included in driver and executor Of classpath Next
–packages Included in driver and executor Of classpath Under the jar Packages are comma separated ”groupId:artifactId:version” list
–exclude-packages Comma separated ”groupId:artifactId” list
–repositories Comma separated remote warehouses
–py-filesPY_FILES Comma separated ”.zip”,”.egg” perhaps “.py” file , These documents are in python app Of PYTHONPATH below
–filesFILES Comma separated files , These files are placed in each executor Under the working directory of
–confPROP=VALUE fixed spark Configuration properties , The default is conf/spark-defaults.conf
–properties-fileFILE Loading files with additional properties
–driver-memoryMEMDriver Memory , Default 1G
–driver-java-options Pass to driver Extra Java Options
–driver-library-path Pass to driver Additional Library paths for
–driver-class-path Pass to driver Extra Classpaths for
–executor-memoryMEM Every executor Of memory , The default is 1G
–proxy-userNAME Impersonate the user who submitted the application
–driver-coresNUMDriver The number of nuclear , The default is 1. This parameter is only used in standalone colony deploy Use... In mode
–superviseDriver When the failure , restart driver. stay mesos perhaps standalone Next use
–verbose Print debug Information
–total-executor-coresNUM all executor The total number of kernels . Only in the mesos perhaps standalone Next use
–executor-coreNUM Every executor The number of nuclear . stay yarn perhaps standalone Next use
–driver-coresNUMDriver The number of nuclear , The default is 1. stay yarn Use in cluster mode
–queueQUEUE_NAME Queue name . stay yarn Next use
–num-executorsNUM Starting up executor Number . The default is 2. stay yarn Next use

Performance viewing mode

Spark Tasks can be viewed in the following ways :

1)Web UI

2)Driver Program console log

3)logs Log under folder

4)work Log under folder

5)Profiler Tools , for example , some JVM Of Profiler Tools , Such as Yourkit、Jconsole perhaps JMap、JStack Wait for the order . More comprehensive can be achieved through the cluster Profiler Tools , Such as Ganglia、Ambaria etc. .

Task optimization

Spark Task optimization should be carried out from the following aspects :

Merge small partitions

Application

Apply to filter After the operation , Filtering out large amounts of data .

The user to use Spark In the process of , Often use filter Operator for data filtering . And frequent filtering or too much filtered data will cause problems , Cause a large number of small partitions ( Each partition has a small amount of data ). because Spark Each data partition is assigned a task to execute , If there are too many tasks , The amount of data processed by each task is very small , It will cause high thread switching cost , Many tasks are waiting to be performed , The problem of low parallelism , It is very uneconomical .

Solution

Use repartion/coalesce Redo the partition merge

val rdd=sc.textFile("1.log").filter(line=>lines.contains("error")).filter(line=>line.contains(info)).repartition(10).collect();

Increase parallelism

Application

Read files in parallel , Conduct Map/Filter/Reduce Wait for the operation .

Solution

  1. Add parallelism parameter when calling function , Such as :reduceByKey(func,numPartitions)

  2. To configure spark.default.parallelism Parameters

The official recommendation is that each CPU Distribute 2~3 A mission , But in actual production, the number of parallelism needs to be weighed ;

If parallelism is too high , Too many tasks , There will be a lot of task startup and switching overhead ;

If parallelism is too low , The number of tasks is too small , The parallel computing power of the cluster will not be exerted , Task execution is too slow , At the same time, it may cause memory combine Too much data takes up memory , And there's a memory overflow (out ofmemory) It's abnormal .

DAG Optimize

Solution

  1. The same Stage Contain as many operators as possible , In order to reduce shuffle operation , Reduce the overhead of task startup and switching

  2. Reuse cached data , Use cache and persist Cache data

Memory optimization

Solution

JVM tuning

Minimize the use of objects with small amounts of data .

Different Java Every object has an object header (object header), The object header is approximately 16byte, Contains information such as a pointer to the class of this object , For some objects with only a small amount of data , This is extremely uneconomical . for example , only one Int Object of property , The information in this header will occupy more space than the data space of the object .

Java String in (String) Occupy 40byte Space .String The memory is to store the information of the real string in a char Array , It also stores other information , Such as string length , At the same time, if UTF-16 code , One character takes up 2byte Space . Above all , One 10 Character strings can take up more than 60byte Of memory space .

Evaluate memory footprint

The best way to calculate the amount of space the data occupies in the cluster memory is to create a RDD, Reading this data , Load data into cache, View in the driver console SparkContext Log . This log information shows how much space each partition occupies ( When there is not enough memory , Write data to disk ), Then the user can roughly estimate the total number of partitions RDD Occupied space . for example , The following log information .

INFO BlockManagerMasterActor:Added rdd_0-1 in memory on mbk.local:50311(size:717.5 KB,free:332.3 MB)

This means RDD0 Of partition1 consumed 717.5KB Memory space .

Adjust the data structure

Reduce some other than the original data Java Consumption of unique information , Such as pointer consumption in chain structure 、 Consumption of metadata generated by packaging data .

When designing and selecting data structures, it is best to use array types and basic data types , Try to reduce some chain Java Set or Scala Use of set types .

Reduce object nesting . for example , Using a large amount of data is small 、 Multiple objects and collection data structures with pointers , This incurs a large amount of pointer and object header metadata overhead .

Consider using numerical ID Or enumerate objects , Instead of using strings as key The data type of the key , The metadata of the string and its character encoding take up too much space .

Serialized storage RDD

In the program, you can set StorageLevels This enumeration type is used to configure RDD Data storage mode of . If the user wants to cache data in memory , Is officially recommended Kyro Serialization library , because Kyro Compared with Java The serialized object takes up less space , Better performance .

GC tuning

When JVM When you need to replace and reclaim the space occupied by old objects to provide storage space for new objects , according to JVM Garbage collection algorithm ,JVM Will traverse all Java object , Then find objects that are no longer in use and recycle them .

When designing data structures, you should try to use data structures that create fewer objects , For example, try to use arrays Array, And use less linked lists LinkedList, Thus reducing garbage collection overhead .

Yes GC Come on , An important configuration parameter is memory allocation RDD The amount of space used for caching .

By default ,Spark Use the configured Executor 60% Of memory (spark.executor.memory) cache RDD. That means 40% The remaining memory space can make Task Cache newly created objects during execution .

In some cases , Users' tasks slow down , and JVM Frequent garbage collection or memory overflow (out of memory abnormal ), In this case, you can adjust the percentage parameter to 50%. This percentage parameter can be configured spark-env.sh The variables in the spark.storage.memoryFraction=0.5 To configure .

At the same time, the serialized cache storage objects are combined to reduce the memory space , It will more effectively alleviate the problem of garbage collection .

OOM Optimize

To avoid memory overflow , It can be optimized from the following aspects :

  1. Inspection procedure , See if there are dead loops or places where you don't have to create a lot of objects repeatedly ;

  2. increase Java In the virtual machine Xms( Initial heap size ) and Xmx( Maximum heap size ) The size of the parameter , Such as setJAVA_OPTS=-Xms256m- Xmx1024m;

  3. Optimize data structure and related configuration parameters

Temporary directory space optimization

Configuration parameters spark.local.dir Can be configured Spark In the temporary directory of the disk , The default is /tmp Catalog . stay Spark Conduct Shuffle In the process of , Intermediate results are written to Spark In the temporary directory of the disk , Or when the memory cannot be fully stored RDD when , Data that cannot be stored in memory will be written to the configured disk temporary directory .

Setting this temporary directory too small will cause No space left ondevice abnormal . You can also configure multiple disk blocks spark.local.dir=/mn1/spark,/mnt2/spar,/mnt3/spark To expand Spark Disk temporary directory of , So that more data can be written to disk , To speed up the I/O Speed .

Network transmission optimization

Large task distribution

The metadata information of the task will be serialized during task distribution , And what the task requires jar And documents .

Tasks are distributed through AKKA In the library Actor Messaging between models . because Spark Adopted Scala Functional style of , The variable reference of the transfer function is passed in the form of closure , So when the data to be transmitted passes through Task When it comes to distribution , It will slow down the overall execution speed .

Configuration parameters spark.akka.frameSize( Default buffer The size is 10MB) It can mitigate the damage caused by too large tasks AKKA Buffer overflow problem , But this approach does not solve the essential problem .

Use broadcast variables

Broadcast Mainly used for sharing Spark In the calculation process, each task Read only variables that will be used ,Broadcast Only one copy of the variable will be saved on each computing machine , Not every task Pass a copy , This saves a lot of space , Saving space means reducing transmission time , It's also efficient .

collect The result is too large to optimize

adopt SparkContext Turn each partition execution into an array , After returning to the primary node , Combine all partitioned arrays into one array . At this time , If carried out Collect The data is too large , There will be problems , A large number of slave nodes write data back to the same node , Slow down the overall running time , Or it may cause memory overflow problems .

Solution

When the final result data collected is too large , Data can be stored in a distributed HDFS Or other distributed persistence layer . Distribute data storage , It can reduce the single machine data I/O Overhead and stand-alone memory storage pressure .

Or when the data is not too large , But it will exceed AKKA Transmission of Buffer Big hour , Need to increase the AKKA Actor Of buffer, Parameters can be configured by spark.akka.frameSize( The default size is 10MB) Adjustment .

Serialization optimization

Spark Two serialization libraries and two sequences are provided in It's a way of : Use Java The way and use of standard serialization library for serialization Kyro How libraries are serialized .

Java The standard serialization library has good compatibility , But it's big 、 Slow speed ,Kyro Library compatibility is slightly poor , But it's small 、 Fast . So when you can use Kyro Under the circumstances , Still recommended Kyro serialize .

Can pass spark.serializer="org.apache.spark.serializer.KryoSerializer" To configure whether to use Kyro serialize , This configuration parameter determines Shuffle Carry out network transmission and when the memory cannot hold RDD When writing partition to disk , The type of serializer used .

If the object takes up a lot of space , Need to increase the Kryo Buffer capacity of , You need to add configuration items spark.kryoserializer.buffer.mb The numerical , The default is 2MB, But the parameter value should be large enough , To accommodate the largest transfer of serialized objects .

Compression optimization

stay Spark Chinese vs RDD perhaps Broadcast Data compression , It is a means to improve data throughput and performance . compressed data , Can greatly reduce the disk storage space , At the same time, the compressed files are transferred between disks and I/O And the communication overhead of network transmission will also be reduced ; Of course, compression and decompression also bring additional CPU expenses , But you can save more I/O And use less memory overhead .

compressed data , It can minimize the disk space and network required for files I/O The cost of , But compressing and decompressing data always increases CPU The cost of , So it's best for those I/O Intensive jobs use data compression —— There will be a surplus for such homework CPU resources , Or for those systems that don't have enough disk space .

Spark At present, we support LZF and Snappy Two decompression methods .Snappy Provides higher compression speed ,LZF Provides a higher compression ratio , The user can select the compression method according to the specific requirements .

[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-3uhT7DHL-1609390347651)(695FD3180A7D470FA6D0599F0D472776)]

Batch optimization

Application

Call external resources , Such as database connection , You can convert a single record write to a batch write of the database , The data of each partition is written once , In this way, the batch write optimization of the database can be used to reduce the overhead and reduce the pressure on the database .

rdd.foreach(line=>conn=getConnection()
conn.write(line)
conn.close)

After optimization

rdd.mapPartition(line=>conn=getConnection()
for(item<-items)
conn.write(item)
conn.close
)

Use as much as possible reduceByKey Don't use reduce

reduce yes action operation , The results of each task need to be collected on one node , and reduceByKey Is distributed , So there is no reduce Bottleneck .

Data skew optimization

This is also the focus of optimization , Summarize in a separate chapter .

原网站

版权声明
本文为[Angryshark_ one hundred and twenty-eight]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/175/202206240050384776.html