当前位置:网站首页>Spark参数调优实践
Spark参数调优实践
2022-06-24 06:39:00 【Angryshark_128】
参数设置方法
Spark任务在提交时,可以通过以下几种方式进行参数设置:
环境变量
通过配置文件spark-env.sh添加,如
export JAVA_HOME=/usr/local/jdk
export SCALA_HOME=/usr/local/scala
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_WEBUI_PORT=8088
export SPARK_WORKER_WEBUI_PORT=8099
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=8g
这种参数设置是全局的,并不适合所有任务,因而可以当作默认的来使用,遇到具体任务,再单独设置。
项目代码
在SaprkConf中设置,如
val conf=new SparkConf().setMaster("local").setAppName("Myapplication").set("spark.executor.memory","1g")
val sc=new SparkContext(conf)
或者通过System.properties设置,如
System.setProperty("spark.executor.memory","14g")
System.setProperty("spark.worker.memory","16g")
val conf=new SparkConf().setAppName("Simple Application")
val sc=new SparkContext(conf)
提交脚本
在任务提交时设置,如
spark-submit \
--spark.executor.memory 14g \
--spark.worker.memory 16g \
--name SimpleTest \
-jars ./lib/*.jar \
--classpath com.example.test \
sparkTest.jar
若没有明确要求,个人建议可以在任务提交脚本中进行设置,更加灵活。
| 参数名 | 格式 | 参数说明 |
|---|---|---|
| –master | MASTER_URL | 如spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local |
| –deploy-mode | DEPLOY_MODE | Client或者master,默认是client |
| –class | CLASS_NAME | 应用程序的主类 |
| –name | NAME | 应用程序的名称 |
| –jars | JARS | 逗号分隔的本地jar包,包含在driver和executor的classpath下 |
| –packages | 包含在driver和executor的classpath下的jar包逗号分隔的”groupId:artifactId:version”列表 | |
| –exclude-packages | 用逗号分隔的”groupId:artifactId”列表 | |
| –repositories | 逗号分隔的远程仓库 | |
| –py-files | PY_FILES | 逗号分隔的”.zip”,”.egg”或者“.py”文件,这些文件放在python app的PYTHONPATH下面 |
| –files | FILES | 逗号分隔的文件,这些文件放在每个executor的工作目录下面 |
| –conf | PROP=VALUE | 固定的spark配置属性,默认是conf/spark-defaults.conf |
| –properties-file | FILE | 加载额外属性的文件 |
| –driver-memory | MEM | Driver内存,默认1G |
| –driver-java-options | 传给driver的额外的Java选项 | |
| –driver-library-path | 传给driver的额外的库路径 | |
| –driver-class-path | 传给driver的额外的类路径 | |
| –executor-memory | MEM | 每个executor的内存,默认是1G |
| –proxy-user | NAME | 模拟提交应用程序的用户 |
| –driver-cores | NUM | Driver的核数,默认是1。这个参数仅仅在standalone集群deploy模式下使用 |
| –supervise | Driver失败时,重启driver。在mesos或者standalone下使用 | |
| –verbose | 打印debug信息 | |
| –total-executor-cores | NUM | 所有executor总共的核数。仅仅在mesos或者standalone下使用 |
| –executor-core | NUM | 每个executor的核数。在yarn或者standalone下使用 |
| –driver-cores | NUM | Driver的核数,默认是1。在yarn集群模式下使用 |
| –queue | QUEUE_NAME | 队列名称。在yarn下使用 |
| –num-executors | NUM | 启动的executor数量。默认为2。在yarn下使用 |
性能查看方式
Spark任务可以通过以下几种方式进行查看:
1)Web UI
2)Driver程序控制台日志
3)logs文件夹下日志
4)work文件夹下日志
5)Profiler工具,例如,一些JVM的Profiler工具,如Yourkit、Jconsole或者JMap、JStack等命令。更全面的可以通过集群的Profiler工具,如Ganglia、Ambaria等。
任务优化
Spark任务的优化朱要从以下几个方面进行:
合并小分区
适用情况
适用于filter操作后,过滤掉大量数据的情况。
用户使用Spark的过程中,常常会使用filter算子进行数据过滤。而频繁的过滤或者过滤掉的数据量过大就会产生问题,造成大量小分区的产生(每个分区数据量小)。由于Spark是每个数据分区都会分配一个任务执行,如果任务过多,则每个任务处理的数据量很小,会造成线程切换开销大,很多任务等待执行,并行度不高的问题,是很不经济的。
解决方案
使用repartion/coalesce重新进行分区合并
val rdd=sc.textFile("1.log").filter(line=>lines.contains("error")).filter(line=>line.contains(info)).repartition(10).collect();
增加并行度
适用情况
并行读取文件,进行Map/Filter/Reduce等操作。
解决方案
调用函数时添加并行度参数,如:reduceByKey(func,numPartitions)
配置spark.default.parallelism参数
官方推荐每个CPU分配2~3个任务,但在实际生产中并行度数量需要权衡;
如果并行度太高,任务数太多,就会产生大量的任务启动和切换开销;
如果并行度太低,任务数太小,就会无法发挥集群的并行计算能力,任务执行过慢,同时可能会造成内存combine数据过多占用内存,而出现内存溢出(out ofmemory)的异常。
DAG优化
解决方案
同一个Stage内容纳尽可能多的算子,以减少shuffle操作,减少任务启动和切换的开销
复用缓存数据,使用cache和persist将数据缓存
内存优化
解决方案
JVM调优
尽量少使用数据量小的对象。
不同的Java对象都会有一个对象头(object header),这个对象头大约为16byte,包含指向这个对象的类的指针等信息,对一些只有少量数据的对象,这是极为不经济的。例如,只有一个Int属性的对象,这个头的信息所占空间会大于对象的数据空间。
Java中的字符串(String)占用40byte空间。String的内存是将真正字符串的信息存储在一个char数组中,并且还会存储其他的信息,如字符串长度,同时如果采用UTF-16编码,一个字符就占用2byte的空间。综合以上,一个10字符的字符串会占用超过60byte的内存空间。
评估内存占用大小
计算数据在集群内存占用的空间的大小的最好方法是创建一个RDD,读取这些数据,将数据加载到cache,在驱动程序的控制台查看SparkContext的日志。这些日志信息会显示每个分区占用多少空间(当内存空间不够时,将数据写到磁盘上),然后用户可以根据分区的总数大致估计出整个RDD占用的空间。例如,下面的日志信息。
INFO BlockManagerMasterActor:Added rdd_0-1 in memory on mbk.local:50311(size:717.5 KB,free:332.3 MB)
这表示RDD0的partition1消耗了717.5KB内存空间。
调整数据结构
减少一些除原始数据以外的Java特有信息的消耗,如链式结构中的指针消耗、包装数据产生的元数据消耗等。
在设计和选用数据结构时能用数组类型和基本数据类型最好,尽量减少一些链式的Java集合或者Scala集合类型的使用。
减少对象嵌套。例如,使用大量数据量小、个数多的对象和内含指针的集合数据结构,这样会产生大量的指针和对象头元数据的开销。
考虑使用数字的ID或者枚举对象,而不是使用字符串作为key键的数据类型,字符串的元数据和本身的字符编码问题产生的空间占用过大。
序列化存储RDD
在程序中可以通过设置StorageLevels这个枚举类型来配置RDD的数据存储方式。用户如果希望在内存中缓存数据,则官方推荐使用Kyro的序列化库进行序列化,因为Kyro相比于Java的标准序列化库序列化后的对象占用空间更小,性能更好。
GC调优
当JVM需要替换和回收旧对象所占空间来为新对象提供存储空间时,根据JVM垃圾回收算法,JVM将遍历所有Java对象,然后找到不再使用的对象进而回收。
设计数据结构时应该尽量使用创建更少的对象的数据结构,如尽量采用数组Array,而少用链表的LinkedList,从而减少垃圾回收开销。
对GC来说,一个重要的配置参数就是内存给RDD用于缓存的空间大小。
默认情况下,Spark用配置好的Executor 60%的内存(spark.executor.memory)缓存RDD。这就意味着40%的剩余内存空间可以让Task在执行过程中缓存新创建的对象。
在有些情况下,用户的任务变慢,而且JVM频繁地进行垃圾回收或者出现内存溢出(out of memory异常),这时可以调整这个百分比参数为50%。这个百分比参数可以通过配置spark-env.sh中的变量spark.storage.memoryFraction=0.5进行配置。
同时结合序列化的缓存存储对象减少内存空间占用,将会更加有效地缓解垃圾回收问题。
OOM优化
为了避免内存溢出,可以从以下几个方面进行优化:
检查程序,看是否有死循环或不必要重复创建大量对象的地方;
增加Java虚拟机中Xms(初始堆大小)和Xmx(最大堆大小)参数的大小,如setJAVA_OPTS=-Xms256m- Xmx1024m;
优化数据结构和相关配置参数
临时目录空间优化
配置参数spark.local.dir能够配置Spark在磁盘的临时目录,默认是/tmp目录。在Spark进行Shuffle的过程中,中间结果会写入Spark在磁盘的临时目录中,或者当内存不能够完全存储RDD时,内存放不下的数据会写到配置的磁盘临时目录中。
这个临时目录设置过小会造成No space left ondevice异常。也可以配置多个盘块spark.local.dir=/mn1/spark,/mnt2/spar,/mnt3/spark来扩展Spark的磁盘临时目录,让更多的数据可以写到磁盘,加快I/O速度。
网络传输优化
大任务分发
在任务的分发过程中会序列化任务的元数据信息,以及任务需要的jar和文件。
任务的分发是通过AKKA库中的Actor模型之间的消息传送的。因为Spark采用了Scala的函数式风格,传递函数的变量引用采用闭包方式传递,所以当需要传输的数据通过Task进行分发时,会拖慢整体的执行速度。
配置参数spark.akka.frameSize(默认buffer的大小为10MB)可以缓解过大的任务造成AKKA缓冲区溢出的问题,但是这个方式并不能解决本质的问题。
使用广播变量
Broadcast主要用于共享Spark在计算过程中各个task都会用到的只读变量,Broadcast变量只会在每台计算机器上保存一份,而不会每个task都传递一份,这样就大大节省了空间,节省空间的同时意味着传输时间的减少,效率也高。
collect结果过大优化
通过SparkContext将每个分区执行变为数组,返回主节点后,将所有分区的数组合并成一个数组。这时,如果进行Collect的数据过大,就会产生问题,大量的从节点将数据写回同一个节点,拖慢整体运行时间,或者可能造成内存溢出的问题。
解决方案
当收集的最终结果数据过大时,可以将数据存储在分布式的HDFS或其他分布式持久化层上。将数据分布式地存储,可以减小单机数据的I/O开销和单机内存存储压力。
或者当数据不太大,但会超出AKKA传输的Buffer大小时,需要增加AKKA Actor的buffer,可以通过配置参数spark.akka.frameSize(默认大小为10MB)进行调整。
序列化优化
Spark中提供了两个序列化库和两种序列 化方式:使用Java标准序列化库进行序列化的方式和使用Kyro库进行序列化的方式。
Java标准序列化库兼容性好,但体积大、速度慢,Kyro库兼容性略差,但是体积小、速度快。所以在能使用Kyro的情况下,还是推荐使用Kyro进行序列化。
可以通过spark.serializer="org.apache.spark.serializer.KryoSerializer"来配置是否使用Kyro进行序列化,这个配置参数决定了Shuffle进行网络传输和当内存无法容纳RDD将分区写入磁盘时,使用的序列化器的类型。
如果对象占用空间很大,需要增加Kryo的缓冲区容量,就需要增加配置项spark.kryoserializer.buffer.mb的数值,默认是2MB,但参数值应该足够大,以便容纳最大的序列化后对象的传输。
压缩优化
在Spark中对RDD或者Broadcast数据进行压缩,是提高数据吞吐量和性能的一种手段。压缩数据,可以大量减少磁盘的存储空间,同时压缩后的文件在磁盘间传输和I/O以及网络传输的通信开销也会减小;当然压缩和解压缩也会带来额外的CPU开销,但可以节省更多的I/O和使用更少的内存开销。
压缩数据,可以最大限度地减少文件所需的磁盘空间和网络I/O的开销,但压缩和解压缩数据总会增加CPU的开销,故最好对那些I/O密集型的作业使用数据压缩——这样的作业会有富余的CPU资源,或者对那些磁盘空间不富裕的系统。
Spark目前支持LZF和Snappy两种解压缩方式。Snappy提供了更高的压缩速度,LZF提供了更高的压缩比,用户可以根据具体的需求选择压缩方式。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3uhT7DHL-1609390347651)(695FD3180A7D470FA6D0599F0D472776)]
批处理优化
适用情况
调用外部资源,如数据库连接,可以将单条记录写转化为数据库的批量写,每个分区的数据写一次,这样可以利用数据库的批量写优化减少开销和减轻数据库压力。
rdd.foreach(line=>conn=getConnection()
conn.write(line)
conn.close)
优化后
rdd.mapPartition(line=>conn=getConnection()
for(item<-items)
conn.write(item)
conn.close
)
尽量使用reduceByKey不使用reduce
reduce是action操作,需要将各个任务结果汇集到一个节点上,而reduceByKey则是分布式的,因而不存在reduce的瓶颈。
数据倾斜优化
这也是优化时的重点,单独章节进行总结。
边栏推荐
- Come on, it's not easy for big factories to do projects!
- Spirit information development log (1)
- Deploy DNS server using dnsmasq
- [5 minutes to play lighthouse] take you to the light kubernetes release k3s
- Virtual file system
- Rockscache schematic diagram of cache operation
- 网吧管理系统与数据库
- What are the easy-to-use character recognition software? Which are the mobile terminal and PC terminal respectively
- leetcode:84. The largest rectangle in the histogram
- How to build an app at low cost
猜你喜欢

解读AI机器人产业发展的顶层设计

华为云低时延技术的九大绝招

Database stored procedure begin end

Application of intelligent reservoir management based on 3D GIS system

oracle sql综合运用 习题

原神方石机关解密

LuChen technology was invited to join NVIDIA startup acceleration program

Nine unique skills of Huawei cloud low latency Technology

数据库 存储过程 begin end

About Stacked Generalization
随机推荐
Easyscreen live streaming component pushes RTSP streams to easydss for operation process sharing
What is the main function of cloud disk? How to restore deleted pictures
Oceanus kudu sink summary
浅谈如何运营好小红书账号:利用好长尾词理论
[JUC series] completionfuture of executor framework
Nine possibilities of high CPU utilization
Do you want to research programming? I got six!
The data synchronization tool dataX has officially supported reading and writing tdengine
如何低成本构建一个APP
程序员使用个性壁纸
云上本地化运营,东非第一大电商平台Kilimall的出海经
puzzle(019.1)Hook、Gear
目标5000万日活,Pwnk欲打造下一代年轻人的“迪士尼乐园”
Interpreting top-level design of AI robot industry development
35岁危机?内卷成程序员代名词了
How to register the cloud service platform and what are the advantages of cloud server
typescript vscode /bin/sh: ts-node: command not found
SAP实施项目上的内部顾问与外部顾问,相互为难还是相互成就?【英文版】
What are the easy-to-use character recognition software? Which are the mobile terminal and PC terminal respectively
Microsoft Security, which frequently swipes the network security circle, gives us some enlightenment this time?