Fork me on GitHub

Spark内存管理和优化

目录

  • 背景
  • 第一部分 Spark内存管理详解
  • 第二部分 Spark内存优化
  • 第三部分 总结
  • 参考文献及资料

背景

当前企业各类业务应用场景,对于大数据处理的速度追求是越来越”卷”了,特别是互联网行业。回头看,MapReduce这种基于磁盘的大数据计算框架早已过时。一方面,内存IO的速度和磁盘不是一个级别,另外内存硬件的成本逐渐降低。所以基于内存的计算框架逐渐成为主流。Spark计算框架就是在这个背景下出现(其实本质都是实现Google关于大数据的三篇论文)。虽然现在Flink已然形成流式霸权,但大部分企业(传统行业)中,Spark仍然是大数据生产环境上,最主要的分布式计算引擎。

但是在开发和维护Spark任务的时候经常出现内存资源问题。本文将详细介绍Spark的内存管理原理和各个版本的演进。内容主要基于Spark 2.3.0版本。需要读者具备Java JVM基础、Spark RDDShuffleSpark基础知识。前面主要将理论和原理,后面介绍案例。

本文图片使用visio画图,需要原文件的同学可以私信我提供。限于经验等各方面原因,难免有所疏漏或者有失偏颇。如有问题,欢迎联系一起讨论。

第一部分 Spark 内存管理详解

1.1 Spark任务运行基础

Spark在运行时,会启动ExecutorDriver两种JVM进程。其中Driver为管理容器,负责创建Spark Context、提交Spark作业(job),最后将作业转换成计算任务(Task),并协调、调度、下发给任务集群的Executor容器。Executor容器负责具体的计算任务,执行完成后将结果返回给Driver,并提供对RDD的存储服务。参考下图(对于Spark on Yarn运行模式时,Cluster Manager对应为ResourceManagerWorker Node 对应NodeManager)。

cluster-overview

Spark2.3.0版本前,Driver进程的内存管理即为典型JVM内存管理。主要区别是对于ExecutorSpark内存管理引入了堆外内存的概念。

Spark 2.3.0版本开始,Driver进程也引入了堆外内存的概念,如下图。

fenji

我们以Spark on Yarn运行模式来描述Execution的内存管理。当我们将任务提交给Yarn集群,Yarn集群首先分配和AMApplication Master)容器(即Driver)资源,然后向RMResource Manager)申请Executior容器资源。RM处理内存资源请求并分配Executior容器。

1.2 Spark内存管理演进

Spark目前版本演进中支持两种内存管理模式:静态内存管理器(Static Memory Manager)和统一内存管理器(Unified Memory Manager),主要版本演进如下:

  • Spark 1.0+版本,静态内存管理器;

  • Spark 1.6 版本,Executor容器引入堆外(off-Head)内存机制;

  • Spark 1.6+ 版本,引入统一内存管理器;

  • Spark 2.3+版本,Driver容器也引入堆外内存机制(Spark on YarnSpark on K8s);

    注:在官网Spark2.3.0版本特性说明上没找到,但查证为该版本新增特性(目前大量相关Spark的中文介绍材料还没有更新,即Driver没有堆外内存。建议大家可以留意一下官网);

  • Spark 3.0+ 版本,去除对静态内存管理器的支持;

Spark 源码中看:org/apache/spark/memory,通过MemoryManager接口实现管理Storage 内存和 Execution 内存、同一个Executor中的任务调用接口申请或释放内存。而MemoryManager有两个实现 StaticMemoryManagerUnifiedMemoryManager,即分别是静态内存管理器(Static Memory Manager)和统一内存管理器(Unified Memory Manager)。

Spark 1.6+ 版本中,可以通过参数spark.memory.useLegacyMode配置使用哪种模式,默认开启Unified Memory Manager

1
2
# 下面的配置开启静态内存管理器(Static Memory Manager),默认值为false
spark.memory.useLegacyMode = true

后文,将详细介绍两种资源分配的方式。

1.2.1 静态管理模式

所谓静态管理(Static Memory Manager)模式,即存储内存(Storage Memory)、执行内存(Execution Memory)以及其他内存(Other Memory)资源的大小,在Spark应用程序运行期间是固定值。用户需要在任务运行前进行配置,任务一旦运行中将无法动态调整。

1.2.1.1 堆内内存

下图是静态内存管理模式下的内存资源划分:

静态(堆内)

  • 预留内存(Reserved Memory):

    其中Storage内存和Execution内存均有预留区域,用于防止OOM的风险。

  • Storage内存(存储内存):

    计算公式(其中 systemMaxMemory取决于当前 JVM 堆内内存的大小):

    1
    2
    可用的Storage内存 = systemMaxMemory*spark.storage.memoryFraction*spark.storage.safetyFraction
    # spark.storage.memoryFraction 默认为0.6

    用于存储 RDD的缓存(cache)数据 和 广播(Broadcast)。这部分存储对象更多是为将来计算重用的数据。

  • Execution内存(执行内存):

    计算公式:

    1
    2
    可用的Execution内存 = systemMaxMemory*spark.shuffle.memoryFraction*spark.shuffle.safetyFraction
    # spark.shuffle.memoryFraction 默认为0.2

    用于执行Shuffle时占用的内存,主要用于存放 ShufflesJoinsSortaggregations等计算过程中的临时数据。待完成操作后,就会释放资源,寿命较短。

  • 其他内存(Otrher Memory):

    Spark内部元数据和用户自定义数据类型。

1.2.1.2 堆外内存

Spark任务本质还是运行在JVM虚机上的Java进程,所以Executor的内存管理仍然是基于JVM的内存管理。而堆内存受到 JVM统一管理,GC基于一定算法逻辑,当spark任务需要申请和释放内存的时候,并不自由灵活。具体流程如下:

  • 申请内存:
    • Spark在代码中new一个对象实例;
    • JVM从堆内内存分配空间,创建对象并返回对象引用;
    • Spark保存该对象的引用,记录该对象占用的内存;
  • 释放内存:
    • Spark记录该对象释放的内存,删除该对象的引用;
    • 等待JVM的垃圾回收机制释放该对象占用的堆内内存;

所以,对于堆内内存的申请和释放实际是由 JVM 来管理的。因此,在统计堆内内存具体使用量时,考虑性能等各方面原因,Spark 目前采用的是抽样统计的方式来计算已使用的内存。Spark不能准确记录实际可用的堆内内存,当数据量较大时,如果不能及时溢出(Spill)数据到磁盘,也就无法避免内存溢出OOM

Spark预估内存资源算法介绍:https://www.turbofei.wang/spark/2016/12/26/spark%E5%86%85%E5%AD%98%E9%A2%84%E6%B5%8B

源码:org/apache/spark/util/SizeEstimator.scala

所以Spark 从1.6 版本开始引入了Off-heap memory(SPARK-11389)。该内存资源不属于JVM内存,而是调用 Javaunsafe 相关 API 直接向操作系统申请内存,直接在服务器节点的内存中开辟空间,存储经过序列化的二进制数据。由于这种方式不需要 JVM 内存管理,所以可以避免频繁的 GC,但是缺点是用户必须自行编写内存申请和释放的逻辑。

默认情况下,堆外内存是禁用的。可以通过参数spark.memory.offHeap.enabled 参数启用它 ,并通过spark.memory.offHeap.size 参数设置内存大小,单位为字节 。

与堆内内存相比,堆外内存模型比较简单,只有Storage memoryExecution memory,其分布如下图所示:

静态(堆外)

1.2.1.2 总结

Static Memory Manager机制实现起来比较简单,但是使用中需要用户熟悉Spark的存储机制,并具有丰富的资源预设评估能力。否则很容易导致Storage memoryExecution memory空间资源使用冰火两重天,饿的饿死,饱的饱死。

于是Spark社区考虑引入新的内存管理模式,参考提案文件:Unified Memory Manager提案。

1.2.2 统一管理模式

在统一管理模式(Unified Memory Manager )机制下,Storage memoryExecution memory 共享一个内存区域,两者可以相互占用空闲区域,不再有严格的资源限制边界。

1.2.2.1 堆内(On-Heap)内存

统一(堆内)

注:上图显示spark.memory.fraction=0.75 。根据SPARK-15796,从 Spark 2.0 版本开始减少到 0.6

主要变化是预留内存:

  • Reserved预留内存

    这部分内存主要用户Spark内部对象存储。大小不可配置,写死在代码中:

    1
    2
    // 代码文件:org.apache.spark.memory.UnifiedMemoryManager
    private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

    注:对于测试环境,可以通过参数调小该资源。参数值为:spark.testing.reservedMemory

堆内内存的大小为spark任务提交是参数executor-memory或配置参数spark.executor.memory决定。

1.2.2.2 堆外(Off-Heap)内存
1.2.2.2.1 Exector堆外内存

统一(堆外)

对于Spark on YarnSpark 2.3.0)模式,通过下面的参数修改任务的对外内存大小:

1
2
3
4
5
6
spark.memory.enable.offheap.enable = true
# 默认开启

spark.yarn.executor.memoryOverhead=1024
# 单位M
# 默认值为executorMemory * 0.10, with minimum of 384m

例如下面的启动日志,默认申请了384MB的对外内存:

1
YarnAllocator:54 - Will request 2 executor container(s), each with 1 core(s) and 1408 MB memory (including 384 MB of overhead)
1.2.2.2.2 Driver堆外内存

Spark 2.3.0版本对于Driver也引入了堆外内存的机制。堆外内存的大小为:max(driverMemory * 0.10,384MB)。主要注意的是只支持对于Spark任务基于YarnK8s集群调度运行的场景。

官网参数说明如下:

spark.driver.memoryOverhead driverMemory * 0.10, with minimum of 384 The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). This option is currently supported on YARN and Kubernetes.

可以在任务提交日志中看下面下面的日志:

1
2
3
4
# driverMemory = 1G, max(driverMemory * 0.10,384MB) = 384MB
Client:54 - Will allocate AM container, with 1408 MB memory including 384 MB overhead
# driverMemory = 4G, max(driverMemory * 0.10,384MB) = 409MB
Client:54 - Will allocate AM container, with 4505 MB memory including 409 MB overhead
1.2.2.3 计算案例

我们在测试集群上运行一个Spark任务,并计算和验证上面的理论,Yarn集群资源限制:

参数yarn.scheduler.maximum-allocation-mb限制单个容器最大分配内存大小,高于该值请求无法生效。

同样对于单个Node Manager也有资源限制,由参数yarn.nodemanager.resource.memory-mb控制。即单个Node Manager可提供给Yarn集群的物理内存资源。

1
2
3
4
# nodemanager能够申请的最大内存,默认值为30G
yarn.nodemanager.resource.memory-mb: 30G
# 调度时一个container能够申请的最大资源,默认值为4G
yarn.scheduler.maximum-allocation-mb: 4G

对于统一内存管理模式,运行脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
[root@quickstart spark-2.3.0]# cat run_example.sh 
export YARN_CONF_DIR=/etc/hadoop/conf
/home/spark-2.3.0/bin/spark-submit \
--conf "spark.executorEnv.JAVA_HOME=/home/openjdk" \
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/home/openjdk" \
--master yarn \
--deploy-mode cluster \
--num-executors 2 \
--driver-memory 1G \
--executor-memory 1G \
--class org.apache.spark.examples.SparkPi \
/home/spark-2.3.0/examples/jars/spark-examples_2.11-2.3.0.jar

任务运行后我们查看Spark UI,发现Storage Memory的大小为384.1MB

example_ui

接下来我们使用之前理论进行计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Spark任务提交参数
spark.executor.memory=1g
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5

# 堆内存
Java Heap Memory = 1 GB = 1 * 1024 MB = 1024 MB
# 保留内存
Reserved Memory = 300 MB
#
Usable Memory = (Java Heap Memory — Reserved Memory)= 1024 MB - 300 MB = 724 MB
#
Other Memory = Usable Memory * (1.0 — spark.memory.fraction)
= 724 MB * (1.0 - 0.6) = 724 MB * 0.4
= 289.6 MB

Spark Memory = Usable Memory * spark.memory.fraction
= 724 MB * 0.6
= 434.4 MB

Spark Storage Memory = Spark Memory * spark.memory.storageFraction
= 434.4 MB * 0.5 = 217.2 MB

Spark Execution Memory = Spark Memory * (1.0 - spark.memory.storageFraction)
= 434.4 MB * ( 1 - 0.5) = 434.4 MB

这个计算结果(Spark Storage Memory=217.2 MB)和UI显示的Storage Memory(384.1MB)有很大差距的。事实上UI中的Storage Memory = Spark Storage Memory+Spark Execution Memory,即434.4 MB

但是这个结果和UI仍然有差异。我们查看Spark的源码发现可用内存(Usable Memory)的计算方法并不是堆栈内存,而是Runtime.getRuntime.maxMemory

1
2
3
4
5
6
7
8
9
10
11
//org.apache.spark.memory.UnifiedMemoryManager
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)

//获取最大的内存值
(usableMemory * memoryFraction).toLong
}

Java 虚拟机中(JDK8)堆栈内存被划分为新生代(NewGen)和老年代(OldGen ),而新生代又被划分为:EdenFrom SurvivorTo Survivor。即有下面公式:

1
2
ExecutorMemory =  Eden + 2 * Survivor + OldGen  = 1G
systemMemory = Runtime.getRuntime.maxMemory=ExecutorMemory - Survivor

所以UI显示的大小会略小于上面计算的结果,属于正常。

1.2.3 动态占用模式

统一内存管理模式最大新颖点,就是动态占用机制的引入。在实际生产线上Spark任务运行时,数据计算是动态变化的,无法在任务运行前合理的分配好资源,动态占用模式正是解决该需求场景的。

动态占用机制的规则如下:

  • Spark任序提交时,根据spark.memory.storageFraction 参数设置Storage内存和Execution内存 (初始化)。
  • 运行时,如果Storage内存和Execution内存的空间均不够(标准是:存储空间不够放下一个完整的块(Block)),会根据LRU缓存策略,将数据存储到磁盘。若一方空间不足,而对方空间空余时,可借用对方的空间。
  • Storage占用对方的内存,会将占用的部分转移到硬盘上,然后“归还”借来的空间。
  • Execution占用了对方的内存时,却无法立刻“归还”借来的空间,只能等待释放。由于Shuffle过程生成的数据(本质是文件)会在后面使用,而Cache中的数据不一定会在后面使用,因此回收内存可能会导致性能严重下降。

20200604212151736

1.2.3 对比

最后表格对比一下堆内堆外内存,Auto的自动但管理不精细。

内存类别 管理方式 对比
on-heap 由JVM管理 受到JVM GC管理,容易OOM
off-heap 手动管(spark) 用户编写内存申请和释放的逻辑

第二部分 Spark内存优化

讲完枯燥的理论后,我们要使用这些理论指导日常的研发调优、生产问题的分析定位。即理论指导实践,解决问题才是目的。

通常我们说Spark是基于内存计算的,但是并不是说所有的数据对象都是缓存在内存中。如果数据量较大而内存资源不足的场景下,Spark也会把数据缓存在磁盘中。但是为了提升任务运行效率,需要尽量避免缓存数据溢出(Spill)到磁盘。

那么如何判断Spark任务的内存资源管理是合适的?通过上文的理论介绍,Spark任务的内存资源主要使用分配有:存储(Storage)内存和执行(Executor)内存,所以我们分别讨论。通常通过Spark UI界面,来查看Spark任务资源实时使用情况。

2.1 存储(Storage)内存优化

Storage子菜单界面中,如果发现缓存cache中数据开始溢出到磁盘上(Size on Disk)。这时候说明预留给 Storage内存资源不足。例如下图,Size on Disk数值为4.7GB,表示任务中有4.7GB的数据因为资源不足溢出到磁盘进行缓存。

disk

这时候可以尝试先通过适当增加Storage内存资源解决。涉及参数有:

  • spark.memory.fraction,这是资源比例参数(存储(Storage)内存+执行(Executor)内存资源整体),默认是0.6。可以适当调大该值,这样 ExecutionStorage 的整体可用内存资源变大。
  • spark.memory.storageFraction,同样是资源比例参数(StorageStorage+Executor 内存总和的比例)。我们知道在统一内存管理机制下,StorageExecutor 之间内存资源可以动态伸缩借用。但是spark.memory.storageFraction越大,任务运行过程中,Storage 能用的内存就会越多。可以适当调大该值。反之,如果任务更吃 Executor 内存,适当把这个值调小。

2.2 执行(Executor)内存

Stages子菜单界面中,点击最新完成的stage,页面显示stage的运行情况汇总。发现shuffle spill (disk)的数值较大,Executor内存资源不足。例如下图,任务中shuffle的时候,2个Executor平均使用300M的磁盘空间。

shuffleDisk

这时候通过调整参数,适当增加Executor内存资源解决。

2.3 其他内存问题

2.3.1 Driver内存

报错:

1
Job aborted due to stage failure: Total size of serialized results of 334502 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

处理: 对于collect和一些操作,driver会接收各task执行后的数据,spark.driver.maxResultSize参数控制接收数据大小,建议先检查代码,避免或减少take,collect操作,如果不成功再考虑增大该参数。尽量不要使用collect操作即可。

第三部分 总结

Spark引入堆外内存,其实是Spark钨丝计划(Spark Tungsten)的一部分。该计划的细节可以参考databricks公司的官网博客:Project Tungsten: Bringing Apache Spark Closer to Bare Metal 。文章总结了目前Spark的瓶颈主要在CPU和内存。以前磁盘和网络I/O随着高速网络和廉价SSD的使用,已经不再是性能瓶颈。

钨丝计划(Project Tungsten)包含三个方面:

  • 内存管理(Memory Management)和二进制处理(Binary Processing):利用应用的语义(application semantics)来更明确地管理内存,同时消除JVM对象模型和垃圾回收开销。

  • 缓存友好的计算(Cache-aware Computation):使用算法和数据结构来实现内存分级结构(MemoryHierarchy)。

  • 代码生成(Code Generation,CG):使用代码生成来利用新型编译器和CPU。

其中本文介绍Spark内存优化就是钨丝计划的内存管理部分。

计划安排:

参考文献及资料

1、《Deep Dive: Apache Spark Memory Management》介绍视频,链接:https://youtu.be/dPHrykZL8Cg

2、探索Spark Tungsten的秘密,链接:https://github.com/hustnn/TungstenSecret

本文标题:Spark内存管理和优化

文章作者:rong xiang

发布时间:2021年11月16日 - 13:11

最后更新:2022年10月25日 - 23:10

原始链接:https://zjrongxiang.github.io/posts/e4bbf927/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

0%