Fork me on GitHub

Spark研发各类报错汇总

背景

主要记录日常的爬坑记录,以Spark相关为主。

第一部分 报错关键字:System memory must be at least

1.1 报错背景

本地ideal研发环境(windows)运行spark程序调试,报错如下:

1
2
21/03/29 12:28:36 ERROR SparkContext: Error initializing SparkContext. 
java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.

1.2 原因分析

从报错内容上看SparkContext没有初始化成功,错误示内存资源不够,需要至少471859200

1.2.1 spark本地运行机制

Spark在本地运行(local)原理是使用线程模拟进程,所以整个集群启动在一个进程中。

1.2.2 源码分析

上源码(Spark 3.1.0,org.apache.spark.memory.UnifiedMemoryManager):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
object UnifiedMemoryManager {

// Set aside a fixed amount of memory for non-storage, non-execution purposes.
// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
// the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
maxHeapMemory = maxMemory,
onHeapStorageRegionSize =
(maxMemory * conf.get(config.MEMORY_STORAGE_FRACTION)).toLong,
numCores = numCores)
}

/**
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.get(TEST_MEMORY)
val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains(config.EXECUTOR_MEMORY)) {
val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.get(config.MEMORY_FRACTION)
(usableMemory * memoryFraction).toLong
}
}

主要功能点:

  • systemMemory变量定义了系统内存资源。

    1
    2
    3
    4
    5
    6
    package org.apache.spark.internal.config

    val TEST_MEMORY = ConfigBuilder("spark.testing.memory")
    .version("1.6.0")
    .longConf
    .createWithDefault(Runtime.getRuntime.maxMemory)

    其中默认值为Runtime.getRuntime.maxMemory,这个值为java虚拟机(JVM)能够从操作系统获取的最大内存资源,如果启动虚拟机时候没有配置-Xmx参数,那么就是256M=256*1024*1024 beytes

  • reservedMemory变量为系统保留内存资源。优先使用TEST_RESERVED_MEMORY的值,默认值是个表达式,如果IS_TESTING=True(测试模式)则值为0,否则为:RESERVED_SYSTEM_MEMORY_BYTES=300M

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
      val TEST_RESERVED_MEMORY = ConfigBuilder("spark.testing.reservedMemory")
    .version("1.6.0")
    .longConf
    .createOptional
    val IS_TESTING = ConfigBuilder("spark.testing")
    .version("1.0.1")
    .booleanConf
    .createOptional

    //定义
    private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
  • minSystemMemory变量为系统最小内存资源。定义为reservedMemory的1.5倍。

从报错信息看,明显是触发下面的代码逻辑:

1
2
3
4
5
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
}

spark应用中未进行相关参数配置,reservedMemory值为300M,那么minSystemMemory值为450M,而应用程序为设置JVM参数,systemMemory默认是256M。显然触发systemMemory < minSystemMemory条件。

可以使用下面的命令查看java环境的默认Xmx的默认值。

1
2
3
4
5
#In Windows:
java -XX:+PrintFlagsFinal -version | findstr /i "HeapSize PermSize ThreadStackSize"

#In Linux:
java -XX:+PrintFlagsFinal -version | grep -iE 'HeapSize|PermSize|ThreadStackSize'

1.3 修复方法

根据上面的源码分析,我们有下面的修复方法。

1.3.1 生产环境

生产代码按照源码分析要求jvm虚拟机至少是450M以上的内存。

  • 配置-Xmx参数,使得其远大于450M。例如:

    1
    -Xmx1024m

1.3.2 测试调试

  • 配置spark.testing.memory参数

    这时候systemMemory=spark.testing.memory参数的值,例如:

    1
    2
    3
    val sparkConf = new SparkConf()
    .set("spark.testing.memory","2147480000")
    //2147480000=2G
  • 开启测试模式(IS_TESTING=True

    1
    2
    val sparkConf = new SparkConf()
    .set("spark.testing","true")

    这时候minSystemMemory即为0。这时候异常条件不会触发。

  • 指定spark.testing.reservedMemory参数的值(尽可能的小)

    1
    2
    val sparkConf = new SparkConf()
    .set("spark.testing.reservedMemory","0")

    上面的配置下,minSystemMemory的值也为0。

1.4 总结

Spark运行对内存资源进行了门槛限制,如果降低这个限制必须要特意显示配置测试相关的指标配置。

1.1 报错背景

1
Stage 0 contains a task of very large size (183239 KB). The maximum recommended task size is 100 KB.

1.2 原因分析

此错误消息意味着将一些较大的对象从driver端发送到executors。

spark rpc传输序列化数据是有大小的限制,默认大小是128M(即131072K, 134217728 字节)。所以需要修改spark.rpc.message.maxSize配置的值。

1.3 修复方法

在Dirver程序中,修改spark.rpc.message.maxSize 值,例如,增大到1024M:

1
--conf spark.rpc.message.maxSize=1024

1.4 总结

暂无

第三部分 报错关键字:Spark java.lang.UnsupportedClassVersionError: xxxxxx: Unsupported major.minor version 52.0

1.1 报错背景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@quickstart bin]# ./run-example org.apache.spark.examples.SparkPi
Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/spark/launcher/Main : Unsupported major.minor version 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)

1.2 原因分析

Unsupported major.minor version 52.0错误信息时,就可以确定是由于JDK版本低于1.8导致的。即编译时使用了JDK1.8,但是运行环境中的JDK版本低于1.8导致的。

检查一下本地机器的JAVA_HOME环境:

1
2
[root@quickstart home]# env|grep JAVA_HOME
JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera

而实际需要运行的Spark版本为:Spark 2.3.0版本。

1.3 修复方法

调整JAVA_HOME的版本为1.8版本:

1
2
3
4
[root@quickstart bin]# java -version
openjdk version "1.8.0_41"
OpenJDK Runtime Environment (build 1.8.0_41-b04)
OpenJDK 64-Bit Server VM (build 25.40-b25, mixed mode)

1.4 总结

参考文献及资料

https://blog.csdn.net/wangshuminjava/article/details/79792961

0%