Fork me on GitHub

Spark on Yarn任务动态伸缩机制介绍

目录

  • 背景
  • 第一部分 配置实现
  • 第二部分 动态配置原理和源码分析
  • 第三部分 总结
  • 参考文献及资料

背景

Spark默认使用的是资源预分配的模式。即在任务运行之前,需要提前指定任务运行需要的资源量。但是在实际线上生产环境使用过程就存在资源浪费和不足的问题,特别是Spark Streaming类型的任务。例如很多日志数据在一天中量并不是均匀分布的,而是一个“双驼峰”。对于预分配模式,就存在日志峰值期间,运算资源不足导致数据处理的延迟,而在日志低峰时期存在资源闲置却无法释放(特别是资源管理器粗粒度模式)。使得生产线上环境资源未能高效使用。

dynamic-allocation-in-spark-19-638

SparkSpark 1.2版本后,对于Spark On Yarn模式,开始支持动态资源分配(Dynamic Resource Allocation,后文我们也简称DRA)。该机制下Spark CoreSpark Streaming任务就可以根据Application的负载情况,动态的增加和减少Executors

dynamic-allocation-in-spark-20-638

第一部分 配置实现

对于Spark on Yarn模式需要提前配置Yarn服务,主要是配置External shuffle serviceSpark 1.2开始引入)。Spark计算需要shuffle时候,每个Executor 需要把上一个 stagemapper 输出写入磁盘,然后作为 server 等待下一个stagereducer 来获取 map 的输出。因此如果 Executormap 阶段完成后被回收,reducer 将无法找到 block的位置。所以开启 Dynamic Resource Allocation 时,必须开启 External shuffle service。这样,mapper 的输出位置(元数据信息)将会由 External shuffle service(长期运行的守护进程) 来登记保存,Executor 不需要再保留状态信息,可以安全回收。

1.1 Yarn服务配置

首先需要对YarnNodeManager服务进行配置,使其支持SparkShuffle Service

  • 修改每台NodeManager上的配置文件yarn-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    <!--修改和增加-->
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle,spark_shuffle</value>
    </property>
    <property>
     <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
     <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    <property>
     <name>spark.shuffle.service.port</name>
     <value>7337</value>
    </property>
  • 配置服务依赖包。将$SPARK_HOME/lib/spark-1.6.0-yarn-shuffle.jar(注意实际版本号)复制到每台NodeManager${HADOOP_HOME}/share/hadoop/yarn/lib/下。

  • 重启所有NodeManager生效配置调整。

1.2 Spark core 任务配置

1.2.1 配置方法

通常配置Saprk应用任务的参数有三种方式:

  • 修改配置文件spark-defaults.conf,全局生效;

    配置文件位置:$SPARK_HOME/conf/spark-defaults.conf,具体参数如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //启用External shuffle Service服务
    spark.shuffle.service.enabled true
    //Shuffle Service服务端口,必须和yarn-site中的一致
    spark.shuffle.service.port 7337
    //开启动态资源分配
    spark.dynamicAllocation.enabled true
    //每个Application最小分配的executor数
    spark.dynamicAllocation.minExecutors 1
    //每个Application最大并发分配的executor数
    spark.dynamicAllocation.maxExecutors 30
    spark.dynamicAllocation.schedulerBacklogTimeout 1s
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
  • spark-submit 命令配置,个性化生效;

    参考下面的案例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    spark-submit --master yarn-cluster \
    --driver-cores 2 \
    --driver-memory 2G \
    --num-executors 10 \
    --executor-cores 5 \
    --executor-memory 2G \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=5 \
    --conf spark.dynamicAllocation.maxExecutors=30 \
    --conf spark.dynamicAllocation.initialExecutors=10
    --class com.spark.sql.jdbc.SparkDFtoOracle2 \
    Spark-hive-sql-Dataframe-0.0.1-SNAPSHOT-jar-with-dependencies.jar
  • 代码中配置,个性化生效;

    参考下面的scala代码案例:

    1
    2
    3
    4
    5
    6
    val conf: SparkConf = new SparkConf()
    conf.set("spark.dynamicAllocation.enabled", true);
    conf.set("spark.shuffle.service.enabled", true);
    conf.set("spark.dynamicAllocation.minExecutors", "5");
    conf.set("spark.dynamicAllocation.maxExecutors", "30");
    conf.set("spark.dynamicAllocation.initialExecutors", "10");

接下来我们介绍详细的参数含义。

1.2.2 配置说明

Property Name Default Meaning Since Version
spark.dynamicAllocation.enabled false Whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. For more detail, see the description here. This requires spark.shuffle.service.enabled or spark.dynamicAllocation.shuffleTracking.enabled to be set. The following configurations are also relevant: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, and spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio 1.2.0
spark.dynamicAllocation.executorIdleTimeout 60s If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed. For more detail, see this description. 1.2.0
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration, the executor will be removed. For more details, see this description. 1.4.0
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors Initial number of executors to run if dynamic allocation is enabled. If --num-executors (or spark.executor.instances) is set and larger than this value, it will be used as the initial number of executors. 1.3.0
spark.dynamicAllocation.maxExecutors infinity Upper bound for the number of executors if dynamic allocation is enabled. 1.2.0
spark.dynamicAllocation.minExecutors 0 Lower bound for the number of executors if dynamic allocation is enabled. 1.2.0
spark.dynamicAllocation.executorAllocationRatio 1 By default, the dynamic allocation will request enough executors to maximize the parallelism according to the number of tasks to process. While this minimizes the latency of the job, with small tasks this setting can waste a lot of resources due to executor allocation overhead, as some executor might not even do any work. This setting allows to set a ratio that will be used to reduce the number of executors w.r.t. full parallelism. Defaults to 1.0 to give maximum parallelism. 0.5 will divide the target number of executors by 2 The target number of executors computed by the dynamicAllocation can still be overridden by the spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors settings 2.4.0
spark.dynamicAllocation.schedulerBacklogTimeout 1s If dynamic allocation is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested. For more detail, see this description. 1.2.0
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout Same as spark.dynamicAllocation.schedulerBacklogTimeout, but used only for subsequent executor requests. For more detail, see this description. 1.2.0
spark.dynamicAllocation.shuffleTracking.enabled false Experimental. Enables shuffle file tracking for executors, which allows dynamic allocation without the need for an external shuffle service. This option will try to keep alive executors that are storing shuffle data for active jobs. 3.0.0
spark.dynamicAllocation.shuffleTracking.timeout infinity When shuffle tracking is enabled, controls the timeout for executors that are holding shuffle data. The default value means that Spark will rely on the shuffles being garbage collected to be able to release executors. If for some reason garbage collection is not cleaning up shuffles quickly enough, this option can be used to control when to time out executors even when they are storing shuffle data. 3.0.0

1.3 Spark Streaming 任务配置

对于Spark Streaming 流处理任务,Spark官方并未在文档中给出介绍。Dynamic Resource Allocation配置指引如下:

  • 必要配置(Spark 3.0.0)

    1
    2
    3
    4
    5
    6
    # 开启Spark Streaming流处理动态资源分配参数开关(默认关闭)
    spark.streaming.dynamicAllocation.enabled=true

    # 设置最大和最小的Executor数量
    spark.streaming.dynamicAllocation.minExecutors=1(必须正整数)
    spark.streaming.dynamicAllocation.maxExecutors=50(必须正整数,默认Int.MaxValue,即无限大)
  • 可选配置(Spark 3.0.0)

    这些参数可以不用配置,都已经提供了一个较为合理的默认值。

    1
    2
    3
    spark.streaming.dynamicAllocation.scalingUpRatio(必须正数,默认0.9)
    spark.streaming.dynamicAllocation.scalingInterval(单位秒,默认60)
    spark.streaming.dynamicAllocation.scalingDownRatio(必须正数,默认0.3)

第二部分 动态配置原理和源码分析

介绍完使用配置后,接下来将详细介绍实现原理。以便理解各参数的含义和参数调优。

2.1 Spark Core任务

为了动态伸缩Spark任务的计算资源(Executor为基本分配单位),首先需要确定的度量是任务的繁忙程度。DRA机制将Spark任务是否有挂起任务(pending task)作为判断标准,一旦有挂起任务表示当前的Executor数量不够支撑所有的task并行运行,所以会申请增加资源。

2.1.1 资源请求(Request)策略

当Spark任务开启DRA机制,SparkContext会启动后台ExecutorAllocationManager,用来管理集群的Executors。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//package org.apache.spark SparkContext.scala

val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
cleaner = cleaner, resourceProfileManager = resourceProfileManager))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())

Start()方法将ExecutorAllocationListener加入到listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加,删除Executor。并且通过Thread不断添加Executor,遍历Executor,将超时的Executor杀掉并移除。

Spark会周期性(intervalMillis=100毫秒)计算实际需要的Executor的最大数量maxNeeded。公式如下。

1
2
val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutor).toInt

逻辑代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
if (initializing) {
0
} else {
val updatesNeeded = new mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]
numExecutorsTargetPerResourceProfileId.foreach { case (rpId, targetExecs) =>
val maxNeeded = maxNumExecutorsNeededPerResourceProfile(rpId)
if (maxNeeded < targetExecs) {
decrementExecutorsFromTarget(maxNeeded, rpId, updatesNeeded)
} else if (addTime != NOT_SET && now >= addTime) {
addExecutorsToTarget(maxNeeded, rpId, updatesNeeded)
}
}
doUpdateRequest(updatesNeeded.toMap, now)
}
}
  • 当集群中有Executor出现pending task,计算判断条件maxNeeded > targetExecs,并且等待时间超过schedulerBacklogTimeout(默认1s),则会触发方法addExecutorsToTarget(maxNeeded, rpId, updatesNeeded)。对于首次增加Executor。
1
spark.dynamicAllocation.schedulerBacklogTimeout = 1s(秒)
  • 后续按照周期性时间sustainedSchedulerBacklogTimeout来检测pending task,一旦出现pending task,即触发增加Executor。
1
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout = 1s(秒)

每次(轮)触发增加Executor资源请求,增加的数量翻倍,即是一个指数数列(2的n次方),例如:1、2、4、8

2.1.2 资源释放(Remove)策略

对于移除策略如下:

  • 如果Executor闲置(maxNeeded < targetExecs)时间超过以下参数,并且executor中没有cache(数据缓存在内存),则spark应用将会释放该Executor。
1
spark.dynamicAllocation.executorIdleTimeout(单位为秒) 默认60s
  • 如果空闲Executor中有cache,那么这个超时参数为:
1
spark.dynamicAllocation.cachedExecutorIdleTimeout 默认值:Integer.MAX_VALUE(即永不超时)

对于Executor的退出,设计上需要考虑状态的问题,主要:

  • 需要移除的Executor存在cache

    如果需要移除的Executor含有RDD cache。这时候超时时间为整型最大值(相当于无限)。

    1
    2
    3
    4
    5
    6
    private[spark] val DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT =
    ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout")
    .version("1.4.0")
    .timeConf(TimeUnit.SECONDS)
    .checkValue(_ >= 0L, "Timeout must be >= 0.")
    .createWithDefault(Integer.MAX_VALUE)
  • Shuffle状态的保存问题。如果需要移除的Executor包含了Shuffle状态数据(在shuffle期间,Spark executor先要将map的输出写入到磁盘,然后该executor充当一个文件服务器,将这些文件共享给其他的executor访问)。需要提前启动External shuffle service,由专门外置服务提供存储,Executor中不再负责保存,架构上功能解耦。

另外添加和移除Executor之后,需要告知DAGSchedule进行相关信息更新。

2.1.3 配置建议

Spark的动态伸缩机制的几点建议:

  • 给Executor数量设置一个合理的伸缩区间,即[minExecutors-maxExecutors]区间值。
  • 配置资源粒度较小的Executor,例如CPU数量为3-4个。动态伸缩的最小伸缩单位是单个Executor,如果出现资源伸缩,特别是Executor数目下降后业务量突增,新申请资源未就绪,已有的Executor就可能由于任务过载而导致集群崩溃。
  • 如果程序中有shuffle,例如(reduce,groupBy),建议设置一个合理的并行数,避免杀掉过多的Executors。
  • 对于每个Stage持续时间很短的应用,不适合动态伸缩机制。这样会频繁增加和移除Executors,造成系统颠簸。特别是在 Spark on Yarn模式下资源的申请处理速度并不快。

2.2 Spark Streaming 任务

Spark Streaming任务可以看成连续运行的微(micro-batch)批任务,如果直接套用Spark Core的动态伸缩机制就水土不服了。一般一个微批任务较短(默认60秒),实际线上任务可能更小,动态伸缩的反应时间较长(特别是on Yarn模式),一个微批任务结束,动态伸缩策略还没生效。所以针对Spark Streaming任务,项目组设计新的动态机制(Spark 2.0.0 版本引入)。

提案:https://issues.apache.org/jira/browse/SPARK-12133

2.2.1 源码分析

Spark Streaming任务会统计微批任务运行时间的延迟时间,最朴素的想法就是按照这个度量指标来作为动态伸缩的触发指标。这部分源码在org.apache.spark.streaming.scheduler中:

  • 周期性计算微批运行完成的平均时间,然后和batch interval进行比较;

    这里的周期大小由参数spark.streaming.dynamicAllocation.scalingInterval决定,大小为scalingIntervalSecs * 1000。例如默认值为:60*1000毫秒,即60秒。

    通过streamingListener计算微批平均处理时间(averageBatchProcTime),然后计算微批处理率(ratio,微批平均处理时间/微批处理周期)。

    然后和参数值上限(scalingUpRatio)和下限(scalingDownRatio)进行比较。详细控制函数如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private def manageAllocation(): Unit = synchronized {
    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    if (batchProcTimeCount > 0) {
    val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    val ratio = averageBatchProcTime.toDouble / batchDurationMs
    logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    if (ratio >= scalingUpRatio) {
    logDebug("Requesting executors")
    val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    requestExecutors(numNewExecutors)
    } else if (ratio <= scalingDownRatio) {
    logDebug("Killing executors")
    killExecutor()
    }
    }
    batchProcTimeSum = 0
    batchProcTimeCount = 0
    }
  • 增加Executor数量;如果ratio >= scalingUpRatio,然后按照下面的公司增加数量:

    1
    val numNewExecutors = math.max(math.round(ratio).toInt, 1)

    例如ratio=1.6>0.9(scalingUpRatio),这时候说明有大量微批任务出现了延迟,按照公式计算numNewExecutors=2。接下来会调用requestExecutors(numNewExecutors)方法去申请2个Executor。

  • 减少Executor数量;如果ratio <= scalingDownRatio,这直接调用killExecutor()方法(方法中判断没有receiver运行的Executor)去kill Executor。

2.2.2 配置建议

Spark Streaming动态资源分配起作用前,需要至少完成一个Batch处理(batchProcTimeCount > 0)。

  • Spark Core和Spark Streaming的动态配置开关配置是分别设置的。

    如果两个配置开关同时配置为true,会抛出错误。建议如下配置:

    1
    2
    spark.dynamicAllocation.enabled=false (默认是false,可以不配置)
    spark.streaming.dynamicAllocation.enabled=true

第三部分 总结

3.1 对比

Spark Core中动态伸缩机制是基于空闲时间来控制回收Executor。而在Spark Streaming中,一个Executor每隔很短的时间都会有一批作业被调度,所以在streaming里面是基于平均每批作业处理的时间。

3.2 Structed Streaming任务动态伸缩

在spark Streaming中,最小的可能延迟受限于每批的调度间隔以及任务启动时间。所以这不能满足更低延迟的需求。如果能够连续的处理,尤其是简单的处理而没有任何的阻塞操作。这种连续处理的架构可以使得端到端延迟最低降低到1ms级别,而不是目前的10-100ms级别,这就是Spark 2.2.0版本引入新的Spark流处理框架:Structed Streaming

https://issues.apache.org/jira/browse/SPARK-20928

当然项目组自然也会考虑该框架的资源伸缩机制(未完成)

https://issues.apache.org/jira/browse/SPARK-24815

后续趋势上看,Spark项目会将更多精力放在Structed Streaming

3.3 Spark Streaming 背压机制

为了应对Spark Streaming处理数据波动,除了资源动态伸缩机制,在Spark 1.5版本项目在Spark Streaming 中引入了的背压(Backpressure)机制。

Spark Streaming任务中,当batch的处理时间大于batch interval时,意味着数据处理速度跟不上数据接收速度。这时候在数据接收端(Receiver)Executor就会开始积压数据。如果数据存储采用MEMORY_ONLY模式(内存)就会导致OOM,采用MEMORY_AND_DISK多余的数据保存到磁盘上,增加数据IO时间。

背压(Backpressure)机制,通过动态控制数据接收速率来适配集群数据处理能力。这是被动防守型的应对,将数据缓存在Kafka消息层。如果数据持续保持高量级,就需要主动启停任务来增加计算资源。

参考文献及资料

1、Job Scheduling,链接:https://spark.apache.org/docs/latest/job-scheduling.html#configuration-and-setup

2、About Spark Streaming,链接:https://www.turbofei.wang/spark/2019/05/26/about-spark-streaming

0%