目录
- 背景
- 第一部分 Spark内存管理详解
- 第二部分 Spark参数说明
- 第三部分 Spark内存优化
- 第四部分 常见线上问题解决
- 参考文献及资料
背景
Shuffle英文中含义是:“洗牌”,这个含义和分布式大数据计算框架有什么联系呢?我们先从大数据计算的”Hello World”案例讲解。
给定一个英文文本文件(存储在分布式文件系统中),然后计算每个单词的词频。这是我们运行大数据计算框架最常用的案例。通常我们将每个单词作为键(key),出现的频率为1,这样就有一个键值对(例如:{“the”:1})。最后我们将相同键值进行合并统计即可。但是在分布式处理场景下,如何统计分布在不同节点的键值对呢?唯一的办法就是需要把相同的键值对收集到同一个节点进行汇总计算求和。
这个计算过程中(参考下图),收集各个节点键值对,并分发到同个节点汇总的过程,就是Shuffle。相当于对数据进行从“洗牌”。
注:相同的键值会被分发到相同的reduce
计算节点。
在分布式计算框架中,Shuffle的过程代价比较大,因为数据的重分配,意味着数据在磁盘、内存中的I/O以及节点通信中的网络I/O。所以Shuffle阶段的设计优劣是决定一个分布式计算框架优劣的关键因素。
正是由于Shuffle的计算几乎需要消耗所有类型的硬件资源,比如CPU、内存、磁盘与网络,在绝大多数的Spark作业中,Shuffle往往是作业执行性能的瓶颈。
第一部分 Shuffle概述
1.1 MapReduce shuffle
上文我们案例其实是MapReduce计算框架。在MapReduce计算框架中Shuffle阶段是链接Map和Reduce之间的桥梁。Spark是基于MapReduce思想实现的计算框架,同样也存在shuffle流程。
在正式介绍Spark shuffle前,我们先引用介绍MapReduce shuffle过程来作为入门和比较。下图简要展示了MapReduce框架中Shuffle流程(图中黑框部分)。
MapReduce中,根据处理特点,将Shuffle分为两个子阶段:Map端和Reduce端。
Map端
- 环形内存缓存区:每个split数据交由一个map任务处理,map的处理结果不会直接写到硬盘上,会先输送到环形内存缓存区中,默认的大小是100M(可通过配置修改),当缓冲区的内容达到80%后会开始溢出,此时缓存区的溢出内容会被写到磁盘上,形成一个个spill file,注意这个文件没有固定大小。
- 在内存中经过分区、排序后溢出到磁盘:分区主要功能是用来指定 map 的输出结果交给哪个 reduce 任务处理,默认是通过 map 输出结果的 key 值取hashcode 对代码中配置的 redue task数量取模运算,值一样的分到一个区,也就是一个 reduce 任务对应一个分区的数据。这样做的好处就是可以避免有的 reduce 任务分配到大量的数据,而有的 reduce 任务只分配到少量甚至没有数据,平均 reduce 的处理能力。并且在每一个分区(partition)中,都会有一个 sort by key 排序,如果此时设置了 Combiner,将排序后的结果进行 Combine 操作,相当于 map 阶段的本地 reduce,这样做的目的是让尽可能少的数据写入到磁盘。
- 合并溢出文件:随着 map 任务的执行,不断溢出文件,直到输出最后一个记录,可能会产生大量的溢出文件,这时需要对这些大量的溢出文件进行合并,在合并文件的过程中会不断的进行排序跟 Combine 操作,这样做有两个好处:减少每次写入磁盘的数据量&减少下一步 reduce 阶段网络传输的数据量。最后合并成了一个分区且排序的大文件,此时可以再进行配置压缩处理,可以减少不同节点间的网络传输量。合并完成后着手将数据拷贝给相对应的reduce 处理,那么要怎么找到分区数据对应的那个 reduce 任务呢?简单来说就是 JobTracker 中保存了整个集群中的宏观信息,只要 reduce 任务向 JobTracker 获取对应的 map 输出位置就可以了。具体请参考上方的MapReduce工作原理。
Reduce端
reduce 会接收到不同 map 任务传来的有序数据,如果 reduce 接收到的数据较小,则会存在内存缓冲区中,直到数据量达到该缓存区的一定比例时对数据进行合并后溢写到磁盘上。随着溢写的文件越来越多,后台的线程会将他们合并成一个更大的有序的文件,可以为后面合并节省时间。这其实跟 map端的操作一样,都是反复的进行排序、合并,这也是 Hadoop 的灵魂所在,但是如果在 map 已经压缩过,在合并排序之前要先进行解压缩。合并的过程会产生很多中间文件,但是最后一个合并的结果是不需要写到磁盘上,而是可以直接输入到 reduce 函数中计算,每个 reduce 对应一个输出结果文件。
Spark根据RDD的宽依赖划分stage,stage中又包含了task。每个stage中的task依赖上游stage中task的输出,上游task落盘称为shuffle写,下游task读称为shuffle读,上游task相当于MR的map阶段,下游task相当于MR的reduce阶段。不同stage间task的读写构成了spark的shuffle流程。
下游task(reduce端)会去上游task(map端)所在节点读取自己需要的分区数据。整个过程涉及到序列化、磁盘IO等操作。
shuffle write
- Data move from Executor(s) to another Executor(s) - is used when data needs to move between executors (e.g. due to JOIN, groupBy, etc)
第二部分 Shuffle的框架
2.1 Shuffle框架的演进
Spark Shuffle历史节点
Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台
https://blog.csdn.net/ifenggege/article/details/107968518
2.2 Shuffle框架内核
第三部分 基于Hash的Shuffle
第四部分 基于Sort的Shuffle
第四部分 基于Tungsten的Shuffle
https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
https://0x0fff.com/spark-architecture-shuffle/
第五部分 常见问题
- 磁盘临时文件空间不足
报错: java.io.IOException: No space left on device 处理: 在shuffle过程中,中间文件都放在/tmp目录,当shuffle文件达到磁盘空间上限,就报错。解决方法可以增大executor个数,分担压力,如果仍不可以的话就联系平台同学配置spark-defaults.conf中设置spark.local.dir(默认是/tmp)为磁盘空间足够的目录即可解决。在yarn模式则配置LOCAL_DIRS。
第六部分 附录
Shuffle 中的重要参数:
spark.local.dir
: Shuffle 缓存目录spark.shuffle.file.buffer
: shuffle write 阶段 buffer 缓冲大小,将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。默认值为 32K。spark.reducer.maxSizeInFlight
: shuffle read 阶段 buffer 缓冲区大小。默认值为 48M。spark.shuffle.io.maxRetries
: Shuffle read 阶段拉取数据失败时的最大重试次数。默认值 3。spark.shuffle.io.retryWait
: Shuffle read 阶段拉取数据失败重试时的等待时间。默认值 5s。spark.shuffle.sort.bypassMergeThreshold
: 使用 bypassMergeSortShuffleWriter 机制,RDD 分区数的限制阈值。默认值为 200。spark.memory.fraction & spark.memory.storageFraction
: 调整 Shuffle 相关内存所占的比例spark.memory.fraction
: 缺省值 0.6。存储内存和执行内存占(heap 内存 - 300M)的百分比spark.memory.storageFraction
: 缺省值 0.5 存储内存与 (存储内存与执行内存之和)的百分比spark.shuffle.manager
: 通过反射方式生成的 SortShuffleManager 的实例。默认为 SortShuffleManager。- Spark 1.5 以后,有三个可选项:hash、sort 和 tungsten-sort。
spark.shuffle.consolidateFiles
:spark.shuffle.mapOutput.minSizeForBroadcast
:默认值 512Kspark.shuffle.mapOutput.dispatcher.numThreads
: 默认值为 8,map 端输出派发线程池中的线程数
参考文献及资料
1、RuoYi-Cloud
项目文档,链接:https://blog.csdn.net/ifenggege/article/details/107968518