Fork me on GitHub

Pyspark实现原理和源码分析

目录

  • 背景
  • 第一部分 常用快捷键
  • 参考文献及资料

背景

https://mikolaje.github.io/2019/pyspark_slower.html

https://blog.csdn.net/oTengYue/article/details/105379628

https://www.readfog.com/a/1631040025628086272

spark为了保证核心架构的统一性,在核心架构外围封装了一层python,spark的核心架构功能包括计算资源的申请,task的管理和分配, driver与executor之间的通信,executor之间的通信,rdd的载体等都是在基于JVM的

spark的这种设计可以说是非常方便的去进行多种开发语言的扩展。但是也可以明显看出与在jvm内部运行的udf相比,在python worker中执行udf时,额外增加了数据在executor jvm和pythin worker之间序列化、反序列化、及通信IO等损耗,并且在程序运行上python相比java的具有一定的性能劣势。在计算逻辑比重比较大的spark任务中,使用自定义udf的pyspark程序会明显有更多的性能损耗。当然在spark sql 中使用内置udf会降低或除去上述描述中产生的性能差异。

程序模型提交命令:

1
2
3
4
5
6
7
[root@quickstart pysparkExample]# cat run.sh 
/usr/lib/spark/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--archives hdfs:///user/admin/python/python3.5.2.zip \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python3.5.2.zip/conda/bin/python \
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[root@quickstart pysparkExample]# ./run.sh
21/05/16 00:02:32 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/172.17.0.2:8032
21/05/16 00:02:32 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers
21/05/16 00:02:32 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (2816 MB per container)
21/05/16 00:02:32 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
21/05/16 00:02:32 INFO yarn.Client: Setting up container launch context for our AM
21/05/16 00:02:32 INFO yarn.Client: Setting up the launch environment for our AM container
21/05/16 00:02:32 INFO yarn.Client: Preparing resources for our AM container
21/05/16 00:02:33 INFO yarn.YarnSparkHadoopUtil: getting token for namenode: hdfs://quickstart.cloudera:8020/user/admin/.sparkStaging/application_1621088965108_0002
21/05/16 00:02:33 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 9 for admin on 172.17.0.2:8020
21/05/16 00:02:34 INFO hive.metastore: Trying to connect to metastore with URI thrift://quickstart.cloudera:9083
21/05/16 00:02:34 INFO hive.metastore: Opened a connection to metastore, current connections: 1
21/05/16 00:02:34 INFO hive.metastore: Connected to metastore.
21/05/16 00:02:34 INFO hive.metastore: Closed a connection to metastore, current connections: 0
21/05/16 00:02:34 INFO yarn.Client: Source and destination file systems are the same. Not copying hdfs:/user/admin/python/python3.5.2.zip
21/05/16 00:02:34 INFO yarn.Client: Uploading resource file:/home/pyspark/pysparkExample/test.py -> hdfs://quickstart.cloudera:8020/user/admin/.sparkStaging/application_1621088965108_0002/test.py
21/05/16 00:02:34 INFO yarn.Client: Uploading resource file:/tmp/spark-f65e84d5-0438-473c-9dff-03aeb95d4f18/__spark_conf__7787627711692930444.zip -> hdfs://quickstart.cloudera:8020/user/admin/.sparkStaging/application_1621088965108_0002/__spark_conf__7787627711692930444.zip
21/05/16 00:02:35 INFO spark.SecurityManager: Changing view acls to: root,admin
21/05/16 00:02:35 INFO spark.SecurityManager: Changing modify acls to: root,admin
21/05/16 00:02:35 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, admin); users with modify permissions: Set(root, admin)
21/05/16 00:02:35 INFO yarn.Client: Submitting application 2 to ResourceManager
21/05/16 00:02:35 INFO impl.YarnClientImpl: Submitted application application_1621088965108_0002
21/05/16 00:02:36 INFO yarn.Client: Application report for application_1621088965108_0002 (state: ACCEPTED)
21/05/16 00:02:36 INFO yarn.Client:
client token: Token { kind: YARN_CLIENT_TOKEN, service: }
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.admin
start time: 1621094555087
final status: UNDEFINED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1621088965108_0002/
user: admin
21/05/16 00:02:37 INFO yarn.Client: Application report for application_1621088965108_0002 (state: ACCEPTED)
21/05/16 00:02:38 INFO yarn.Client: Application report for application_1621088965108_0002 (state: ACCEPTED)
21/05/16 00:02:39 INFO yarn.Client: Application report for application_1621088965108_0002 (state: ACCEPTED)
21/05/16 00:02:40 INFO yarn.Client: Application report for application_1621088965108_0002 (state: FINISHED)
21/05/16 00:02:40 INFO yarn.Client:
client token: Token { kind: YARN_CLIENT_TOKEN, service: }
diagnostics: N/A
ApplicationMaster host: 172.17.0.2
ApplicationMaster RPC port: 0
queue: root.admin
start time: 1621094555087
final status: SUCCEEDED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1621088965108_0002/history/application_1621088965108_0002/1
user: admin
21/05/16 00:02:40 INFO yarn.Client: Deleting staging directory .sparkStaging/application_1621088965108_0002
21/05/16 00:02:40 INFO util.ShutdownHookManager: Shutdown hook called
21/05/16 00:02:40 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f65e84d5-0438-473c-9dff-03aeb95d4f18

yarn日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
21/05/15 16:19:29 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
21/05/15 16:19:30 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1621095402395_0001_000001
21/05/15 16:19:30 INFO spark.SecurityManager: Changing view acls to: admin
21/05/15 16:19:30 INFO spark.SecurityManager: Changing modify acls to: admin
21/05/15 16:19:30 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(admin); users with modify permissions: Set(admin)
21/05/15 16:19:30 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
21/05/15 16:19:30 INFO yarn.ApplicationMaster: Waiting for spark context initialization
21/05/15 16:19:30 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
21/05/15 16:19:31 INFO spark.SparkContext: Running Spark version 1.6.0
21/05/15 16:19:31 INFO spark.SecurityManager: Changing view acls to: admin
21/05/15 16:19:31 INFO spark.SecurityManager: Changing modify acls to: admin
21/05/15 16:19:31 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(admin); users with modify permissions: Set(admin)
21/05/15 16:19:31 INFO util.Utils: Successfully started service 'sparkDriver' on port 46545.
21/05/15 16:19:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
21/05/15 16:19:31 INFO Remoting: Starting remoting
21/05/15 16:19:31 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.17.0.2:35921]
21/05/15 16:19:31 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriverActorSystem@172.17.0.2:35921]
21/05/15 16:19:31 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 35921.
21/05/15 16:19:31 INFO spark.SparkEnv: Registering MapOutputTracker
21/05/15 16:19:31 INFO spark.SparkEnv: Registering BlockManagerMaster
21/05/15 16:19:31 INFO storage.DiskBlockManager: Created local directory at /yarn/nm/usercache/admin/appcache/application_1621095402395_0001/blockmgr-edbfee4f-522d-4c06-81a0-5b83b750e88a
21/05/15 16:19:31 INFO storage.MemoryStore: MemoryStore started with capacity 491.7 MB
21/05/15 16:19:31 INFO spark.SparkEnv: Registering OutputCommitCoordinator
21/05/15 16:19:31 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/05/15 16:19:31 INFO util.Utils: Successfully started service 'SparkUI' on port 43419.
21/05/15 16:19:31 INFO ui.SparkUI: Started SparkUI at http://172.17.0.2:43419
21/05/15 16:19:31 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler
21/05/15 16:19:31 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40989.
21/05/15 16:19:31 INFO netty.NettyBlockTransferService: Server created on 40989
21/05/15 16:19:31 INFO storage.BlockManager: external shuffle service port = 7337
21/05/15 16:19:31 INFO storage.BlockManagerMaster: Trying to register BlockManager
21/05/15 16:19:31 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.17.0.2:40989 with 491.7 MB RAM, BlockManagerId(driver, 172.17.0.2, 40989)
21/05/15 16:19:31 INFO storage.BlockManagerMaster: Registered BlockManager
21/05/15 16:19:32 INFO scheduler.EventLoggingListener: Logging events to hdfs://quickstart.cloudera:8020/user/spark/applicationHistory/application_1621095402395_0001_1
21/05/15 16:19:32 INFO cluster.YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
21/05/15 16:19:32 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done
21/05/15 16:19:32 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark://YarnAM@172.17.0.2:46545)
21/05/15 16:19:32 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/172.17.0.2:8030
21/05/15 16:19:32 INFO yarn.YarnRMClient: Registering the ApplicationMaster
21/05/15 16:19:32 INFO yarn.ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
21/05/15 16:19:32 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
21/05/15 16:19:32 INFO spark.SparkContext: Invoking stop() from shutdown hook
21/05/15 16:19:32 INFO ui.SparkUI: Stopped Spark web UI at http://172.17.0.2:43419
21/05/15 16:19:32 INFO cluster.YarnClusterSchedulerBackend: Shutting down all executors
21/05/15 16:19:32 INFO cluster.YarnClusterSchedulerBackend: Asking each executor to shut down
21/05/15 16:19:32 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/05/15 16:19:32 INFO storage.MemoryStore: MemoryStore cleared
21/05/15 16:19:32 INFO storage.BlockManager: BlockManager stopped
21/05/15 16:19:32 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
21/05/15 16:19:32 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/05/15 16:19:32 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
21/05/15 16:19:32 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
21/05/15 16:19:32 INFO spark.SparkContext: Successfully stopped SparkContext
21/05/15 16:19:32 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
21/05/15 16:19:32 INFO Remoting: Remoting shut down
21/05/15 16:19:32 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
21/05/15 16:19:32 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
21/05/15 16:19:32 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1621095402395_0001
21/05/15 16:19:32 INFO util.ShutdownHookManager: Shutdown hook called
21/05/15 16:19:32 INFO util.ShutdownHookManager: Deleting directory /yarn/nm/usercache/admin/appcache/application_1621095402395_0001/spark-76c43a00-f562-4e4e-be1a-be3a2fcefc21/pyspark-b56e6390-71a2-4475-8ffe-4164798ab6c4
21/05/15 16:19:32 INFO util.ShutdownHookManager: Deleting directory /yarn/nm/usercache/admin/appcache/application_1621095402395_0001/spark-76c43a00-f562-4e4e-be1a-be3a2fcefc21

http://sharkdtu.com/posts/pyspark-internal.html

https://cloud.tencent.com/developer/article/1589011

https://cloud.tencent.com/developer/article/1558621

https://www.nativex.com/cn/blog/2019-12-27-2/

但是在大数据场景下,JVM和Python进程间频繁的数据通信导致其性能损耗较多,恶劣时还可能会直接卡死,所以建议对于大规模机器学习或者Streaming应用场景还是慎用PySpark,尽量使用原生的Scala/Java编写应用程序,对于中小规模数据量下的简单离线任务,可以使用PySpark快速部署提交。

pyspark与py4j线程模型简析

https://www.jianshu.com/p/013fe44422c9

pyspark日志输出:

1
2
3
4
5
6
7
8
9
10
from pyspark import SparkContext
sc = SparkContext()
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger('MYLOGGER')

# same call as you'd make in java, just using the py4j methods to do so
LOGGER.setLevel(log4jLogger.Level.WARN)

# will no longer print
LOGGER.info("pyspark script logger initialized")

对于Spark1.5.1版本(),

1
2
3
4
5
6
7
8
9
10
from pyspark import SparkContext
sc = SparkContext()
log4jLogger = sc._jvm.org.apache.log4j.Logger
LOGGER = log4jLogger.getLogger('MYLOGGER')

# same call as you'd make in java, just using the py4j methods to do so
LOGGER.setLevel(log4jLogger.Level.WARN)

# will no longer print
LOGGER.info("pyspark script logger initialized")

参考文献及资料

1、Job Scheduling,链接:https://spark.apache.org/docs/latest/job-scheduling.html#configuration-and-setup

0%