目录
- 背景
- 第一部分 环境准备
- 第二部分 原理解析
- 第三部分 案例运行
- 第四部分 线上架构设计
- 参考文献及资料
背景
目前Flink逐步成为企业级大数据平台的使用最广泛的实时计算框架,特别在构建TB级别的实时数仓场景。Flink经历大量企业级线上环境的业务考验。
那么Flink的强大实时计算能力能否赋能给机器学习场景呢?这就是PyFlink项目的目的。我们知道机器学习主要是基于Python语言生态圈。但是Python(Cpython)语言是一个单核语言(即全局解析锁GIL),限制了单机处理性能。但是有了Flink赋能,Python机器学习可以分布式实时处理,就能大大提升机器学习的处理能力。
本文将介绍PyFlink项目的实现原理、环境部署、机器学习案例运行,最后对实际线上架构部署提出架构建议。
第一部分 环境准备
1.1 Hadoop Yarn集群准备
测试环境我们使用CDH开源集群,Flink的运行方式采用Flink on Yarn模式。单机测试的时候还有下面几种模式:
- Local-SingleJVM 模式,开发测试使用,所有角色TM、JM 都在同一个 JVM 里面,线程模拟;
- Local-SingleNode 模式,开发测试使用,运行在单机,进程模拟,伪分布式。
- Cluster 模式,Flink自带集群模式,即Standalone集群;
- k8s模式,k8s云模式部署;
1.1 Flink准备
Flink 版本采用最新(2022年1月15日)稳定版:Apache Flink 1.14.2。
1.2 Python环境准备
Python环境我们使用Aconda环境部署,Python内核版本为Python3.7.6。目前官网要求版本为: 3.6 或 3.7+,否则会出错。
另外需要部署安装PyFlink包。从flink1.10开始,PyFlink安装无需编译源码.
安装命令如下(注意对应的Flink版本):
1 | root@deeplearning:~# pip install apache-flink==1.14.2 |
由于墙的原因建议指定国内源(使用清华大学源:https://pypi.tuna.tsinghua.edu.cn/simple/),否则会很慢:
1 | root@deeplearning:~# pip install apache-flink==1.14.2 -i https://pypi.tuna.tsinghua.edu.cn/simple/ |
1.3 其他环境
- 集群(每个节点)的JAVA_HOME要求1.8或者1.11。
第二部分 原理解析
2.1 运行原理
Flink项目组并没有使用Python语言重新实现Flink引擎,而是基于最小化设计原则(以最小的成本实现既定目标),在Flink Java核心外面套上一层Python API,重用现有的Java核心引擎。整个设计实现类似PySpark。
从Flink版本演进上看,主要提供能力有:
Flink 1.8.x,开始对Python支持;
存在的问题:支持Datase/Stream 两个独立实现的API,底层使用JPython实现。
Flink 1.9.x,对Table的支持;
Flink 1.10.x,增加Python UDFs的支持,在 Table API/SQL 中注册并使用自定义函数;
Flink 1.11.x,Pandas UDF 和用户自定义的 Metrics;
2.1.1 通讯选型
Flink赋能Python最核心的问题是:Java(Flink为Java研发)和Python进程如何实现通讯。在Flink 1.8.x版本的时候使用Jython来实现,性能上比cpython要快。但是机器学习等生态包支持上,远不如cpython。所以从生态融合上看选择Cpython才是正确的方向。
Java和Python通信有两种解法。
第一种,第三方实现,即实现一个统一的大数据处理管道(pipline),并支持多语言开发,通信由管道平台统一处理。这就是Google开源的Apache Beam(如下图)。
通常流程是:用户使用Apache Beam Python SDK编写数据处理管道,选择runner为FlinkRunner,最后将代码提交至Flink集群运行。
- 第二种,由Flink本身在不改变源Java内核的前提下,外层套上一层薄薄的API,然后Python进程通过和这层API通信实现和Java虚拟机内部的通信。这就是大名鼎鼎的Py4J,即Py4J 作为 Java VM 和 Python 解释器之间通讯的桥梁(如下图)。
第一种解法需要依赖Apache Beam SDK能力和生态,为了通用性,必然要牺牲灵活性,参考Beam的外部I/O的支持清单。
最终Flink选择了第二种解法。其实熟悉Spark和PySpark项目的同学应该知道Spark赋能Python也同样选择Py4J的架构(经历过线上实践考验的)。
Py4J库分为Java和Python两部分,基本原理是:
- Java侧,通过
py4j.GatewayServer
,启动一个GatewayServer,监听一个tcp socket(记做server_socket),用于接收Python侧的请求。 - Python侧,启动一个Geteway。通过Socket访问JVM中对象或者调用方法。
- Python侧在创建
JavaGateway
对象时,可以选择同时创建一个CallbackServer
,它会在Python侧监听一个tcp socket(记做callback_socket),用来给Java回调Python代码提供一条渠道。 - Python 这边创建一个 table 对象的时候,它也会在相应的 Java 这边创建一个相同 table 对象。如果创建一个 TableEnvironment 对象,在 Java 部分也会创建一个 TableEnvironment 对象。调用 table 对象上的方法,那么也会映射到 Java 这边,所以是一个一一映射的关系。
- Py4J提供了一套文本协议用来在tcp socket间传递命令。
基于这样的设计,如果用户使用 Python Table API 写出了一个作业(没有 Python UDF 的时候),那么这个作业的性能和用 Java 写出来的作业性能是一样的。因为底层的架构都是同一套 Java 的架构。但是需要考虑socket通信的消耗。
2.1.2 PyFlink UDF
https://www.modb.pro/db/128622
https://developer.aliyun.com/article/738962
2.2 提交任务
参数清单:
-py,--python
Python script with the program entry. The dependent resources can be configured with the
--pyFiles
option.说明:Python程序脚本作为入口程序。例如:
-py /home/flink-1.14.2/examples/python/table/word_count.py
,指定入口程序脚本本地路径。-pym,--pyModule
Python module with the program entry point. This option must be used in conjunction with
--pyFiles
.说明:这个参数和
--pyFiles
参数结合使用。例如这个参数案例:-pym table_api_demo -pyfs file:///path/to/table_api_demo.py
;其中pyfs
参数指定了入口文件的路径(也可分布式文件系统),pym
指定了入口程序脚本名(使用pyfs
参数中文件)。-pyfs,--pyFiles
Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. Files suffixed with .zip will be extracted and added to PYTHONPATH. Comma (‘,’) could be used as the separator to specify multiple files (e.g., –pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).
说明:该参数主要用于上传运行依赖的客户化脚本文件。文件类型支持:.py/.egg/.zip/.whl,甚至目录(没有验证过)。这些文件最后会追加在客户端和远端的Python检索路径中(
PYTHONPATH
)中。多个文件(目录)需要使用英文逗号间隔。对于zip类文件,解压后,路径追加Python检索路径。-pyarch,--pyArchives
Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. ‘#’ could be used as the separator of the archive file path and the target directory name. Comma (‘,’) could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g., –pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data –pyExecutable py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: f = open(‘data/data.txt’, ‘r’).
说明:参数指定需要上传的压缩包文件(通常是Python SDK编译环境文件系统,也可以数据类的依赖包)。上传后压缩文件会在目的节点侧进行文件解压。解压的目标文件目录名可以通过
#
符号来指定(例如:file:///tmp/data.zip#data,解压后目录名即为data,如果压缩包中有文件data.txt,那么python程序就可用通过下面的路径进行访问:f = open(‘data/data.txt’, ‘r’))。多个压缩文件使用英文逗号间隔。-pyclientexec,--pyClientExecutable
The path of the Python interpreter used to launch the Python process when submitting the Python jobs via \”flink run\” or compiling the Java/Scala jobs containing Python UDFs. (e.g., –pyArchives file:///tmp/py37.zip –pyClientExecutable py37.zip/py37/python)
说明:参数指定了flink 客户端侧python的编译环境。例如:参数–pyArchives file:///tmp/py37.zip 指定了python的SDK包,解压后路径为:py37.zip/py37/python,所以通过这个参数可以指定客户端侧的python解析环境路径:–pyClientExecutable py37.zip/py37/python
-pyexec,--pyExecutable
Specify the path of the python interpreter used to execute the python UDF worker (e.g.: –pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements.
说明:类似-pyclientexec,-pyexec参数用来指定执行器的python编译环境,不再赘述。需要注意的是执行器侧python环境需要依赖包版本。
-pyreq,--pyRequirements
Specify the requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use ‘#’ as the separator if the optional parameter exists (e.g., –pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).
说明:参数用来指定第三方依赖包。
第三部分 案例运行
3.1 Hello World案例运行
我们首先运行一个Flink项目自带的简单案例:Hello World。下面是提交命令:
1 | run.sh 脚本内容(其中路径自行调整) |
这样Yarn上就会运行一个Flink任务。
注:Pyflink其中依赖包pyarrow包需要GLIBC 2.14,所以需要注意集群节点操作系统的版本需要大于这个最低要求。
3.2 在线机器学习
3.2.1 背景
第四部分 线上架构设计
https://github.com/uncleguanghui/pyflink_learn/blob/master/examples/README.md
https://cloud.tencent.com/developer/article/1651257
https://zhuanlan.zhihu.com/p/114717285
参考文献及资料
1、PyFlink
项目文档,链接:https://nightlies.apache.org/flink/flink-docs-release-1.14/api/python/
2、The Flink Ecosystem: A Quick Start to PyFlink,链接:https://alibaba-cloud.medium.com/the-flink-ecosystem-a-quick-start-to-pyflink-6ad09560bf50