该文档仅提供性能调优的参考
资源调优
Spark 可以通过参数配置资源分配。资源分配不合理会导致 job 运行过慢甚至失败。资源调优就是为当前 job 分配合适的资源,提高资源利用率最终加快任务运行速度。
Driver
配置项 | 默认值 | 参考值 | 备注 |
spark.driver.memory | 1g | 不 OOM 下越小越好 | 如果数据需收集到 driver ,那么需要根据数据大小配置内存,防止 OOM |
spark.driver.memoryOverhead | max (driverMemory * 0.1, 384) | / | |
spark.driver.cores | 1 | / |
Executor
内存
- 太大的内存会导致 JVM 垃圾回收变慢,尽量小于 64 g
- executor 申请的总内存不能超过 node/container 的总内存,申请的内存大小为以下参数总和
- spark.executor.memory
- spark.executor.memoryOverhead
- spark.memory.offHeap.size
- spark.executor.pyspark.memory
CPU
- 1 core 会导致无法利用 JVM 多线程,以及会使一些 broadcast 相关的参数失效
- core 设置太多可能会使 job 速度运行变慢
配置项 | 默认值 | 建议值 | 备注 |
spark.executor.memory | 1g | 4-8 g | 在不 OOM 情况下设小 |
spark.executor.memoryOverhead | max (executor * 0.1, 384) | / | 用于虚拟机的开销、内部的字符串、本地开销等 |
spark.memory.offHeap.enabled | false | / | 当有需要堆外内存的操作时才配置,一般默认 false 即可 |
spark.memory.offHeap.size | 0 | / | offHeap.enabled true 才生效 |
spark.executor.cores | - Yarn :1 | ||
- standalone:all available cores | 4 | ||
Executor 数量 | 无 | 总 core / executor.cores |
Executor 内存分配
指 spark.executor.memory 的内存分配
Spark 的内存分为两大类:执行内存和存储内存。
- 执行内存:在shuffle, join, aggregation 等计算中使用的内存。
- 存储内存:集群中缓存和 broadcast 使用的内存。
spark.memory.fraction (默认 0.6)
用于执行内存和存储内存的百分比,剩余是为用户数据结构,Spark metadata 等预留的。在预留大小足够下提高此值,可以减少溢写磁盘。
存储内存和执行内存共享同一块空间,且有动态占用机制
- 双方基础内存占比由 spark.memory.storageFraction 决定
- 一方空闲时,另一方可以占用
- 当执行内存不足,且存在被占用内存时:可要求存储内存归还占用部分。存储内存会将占用部分转存到磁盘
- 当存储内存不足,且存在被占用内存时:不可要求执行内存归还
spark.memory.storageFraction(默认0.5):
不受驱逐的存储内存百分比,即这个占比的内存一定不会被驱逐到磁盘中
用于 task 的执行内存大小可以计算得出
spark.executor.memory * spark.memory.fraction *(1-spark.memory.storageFraction)/ spark.executor.cores
总结:
- 发生磁盘溢写时:可尝试调大 spark.executor.memory 或提高 spark.memory.fraction
- spark.memory.storageFraction 一般取默认值即可,不太推荐在溢写时调小该值
动态内存分配
Spark 提供了 Dynamic Executor Allocation ,它能够动态调整 executor 数量,以下场景可以考虑配置
- 和其他团队共享集群
- 在乎 cost
- 某一个 application 有若干不同大小的 job
主要参数如下
spark.dynamicAllocation.enable false
spark.dynamicAllocation.executorIdleTimeout 60s // 如果任务执行时间普遍短,可以调小 timeout
spark.dynamicAllocation.initialExecutors minExecutors // 对于大的 job,调大 initialExecutors
spark.dynamicAllocation.minExecutors 1
spark.dynamicAllocation.maxExecutors infinity //共享的 spark 集群最好配置 maxExecutors
并行度
Spark 并行度 = min( 任务数 = 分区数,总核数 )
一个参考值:分区数 = 总核心数的 2-3 倍
分区初始数量
分区数会影响 Spark 集群的并行度,下面有两种方式来计算分区数量
- 内存资源紧缺时:
Math.round(inputDataSize/availableTaskMemoryMB()).toInt
其中 inputDataSize 为每个 task 的数据大小,可以从 Spark UI 上查看;availableTaskMemoryMB 即为上文计算的 用于 task 的执行内存大小
- 内存资源足够时:分区数量先设置为集群可用总 cores *2,然后逐步往上调,寻找一个最佳分区数(core 的整数倍)
什么是最佳分区数呢?执行时间最短就是最佳,此外还可以根据 Spark UI 判断
- 分区数量太多的表现:executor cpu 内存利用率过低,过多 pending 的 task
- 分区数量太少的表现:executor 空闲
分区调整
- 分区数量调整:使用
repartition()
可以调整分区数量,但会发生 shuffle,若减少分区,可以尝试使用coalesce()
来避免 shuffle (一些特殊场景 repartition 更优,其增加的 shuffle 可能会减少其他地方的 shuffle,降低整体的时间) - 分区策略调整:若发生数据倾斜,可以通过调整合适的分区策略避免
Shuffle 调优
Shuffle 调优的目的是:避免 spill 到 disk 导致任务速度变慢
当在 Spark UI 观察到存在溢写时,一般有以下手段
- 增加内存
- 配置堆外内存
- 增加分区以减少每个任务的数据量
- 调整 shuffle 参数
相关配置如下
配置项 | 默认值 | 推荐 | 备注 |
spark.executor.memory | 1g | 增加 | 内存足够增加内存是最好的方式 |
spark.sql.shuffle.partitions | 200 | 增加 | 调大分区数可以减少每个分区的数据量防止 spill |
- spark.memory.offHeap.enable | |||
- spark.memory.offHeap.size | 关闭 | 打开 | 配置堆外内存减少 shuffle |
spark.memory.fraction | 0.6 | 增加 | 增加存储内存和执行内存的总额 |
spark.shuffle.file.buffer | 32k | 64k | shuffle write 时,会先写到 BufferedOutputStream 缓冲区中,然后再溢写到磁盘。增加此值可以减少 IO 次数,推荐 64k |
spark.shuffle.service.index.cache.size | 100m | 减少 | 缓存的 shuffle 索引文件中索引的数量,减少该值可以防止内存爆炸 |
spark.io.compression.lz4.blockSize | 32k | 增加 | 增大此配置以减少 shuffle 文件的大小 |
spark.shuffle.service.enabled | false | / | 启用外部 shuffle 服务,这样 spark shuffle file 不会保存在 executor |
spark.shuffle.io.backLog | -1 | / | 启用 shuffle.service 时,控制 accept queue |
spark.shuffle.registration.timeout | 5000 | / | 启用 shuffle.service 时,注册的超时时间,推荐增大 |
另外还可以优化代码(SQL or RDD API)防止 shuffle
- Join 时广播小表(如使用 Broadcast Hash Join)
- 尽量使用窄依赖而不是宽依赖
- 使用 ReduceByKey 而不是 GroupbyKey
- 在要进行宽依赖之前,或者进行完一系列复杂操作后,或进行完某些耗时操作后,persist RDD 进行缓存
Spark 调优工具
推荐使用一些 Spark 调优工具来帮助调优
- Sparklens
- Sparklint
- Dr Elephant
参考文档
- Understanding Spark Tuning
- Apache Spark Performance Tuning and Optimizations for Big Datasets
- Should I repartition