Fork me on GitHub

Python系列文章-Python并发和并行编程总结

目录

  • 背景

  • 第一部分 CPython全局解释器锁问题

  • 第二部分 Python中的多线程实现

  • 第三部分 Python中的多进程实现

  • 第四部分 高级应用

  • 参考文献及资料

背景

在讲解正文之前我们先回忆明确一些计算机操作系统的概念。

0.1 线程和进程

进程(Process)和线程(Thread)是计算机操作系统中的重要概念。进程是资源分配的基本单位,一个进程可以包括至少一个数量的线程。线程是程序执行流的最小单位,它隶属一个进程。并与进程的其他线程共享分配资源。例如在操作系统中打开浏览器,就相当于启动了浏览器的进程。在浏览器界面中,我们可以看视频、看网页内容、刷弹幕、留言等,这些就是线程。

0.2 多线程和多进程

现代操作系统都是多任务,这里的任务可以理解为进程。多进程即同时执行多个进程,多线程即同时执行多个线程。

0.3 并发和并行

由于CPU执行代码都是顺序执行的,为了实现多任务同时执行,操作系统将CPU使用时间进行切片,各任务交替执行(此时其他任务挂起)。实际上由于CPU运行速度较快,会让人类感觉(错觉)同时有多个线程在执行。这种交错执行程序的方式称为:并发(concurrency)。

当操作系统资源有多个CPU时,这时候任务可以做到真正的同时执行,程序指令可以同时分别运行在多个CPU核心上,通常称为:并行(parallelism)。

在程序设计中,一些需求如果实现分布式处理,将提高程序运行效率和系统整体性能。其实原理是将一个进程分成多个线程,然后让它们并发异步执行,从而提高运行效率。

Python语言既支持多线程,也支持多进程。

0.4 I/O密集和CPU密集

计算机中任务可以分为计算密集型(也称为CPU密集型)和I/O密集型两类。计算密集型任务,显而易见的特点是要进行大量的计算,消耗CPU资源,比如科学计算、对视频进行高清解码等等,全靠CPU的运算能力。

I/O密集型主要是网络、磁盘I/O较多的任务。这类任务的特点是CPU消耗较少,大部分时间都在等待I/O操作完成(因为I/O的速度远远低于CPU和内存的速度)。

第一部分 CPython全局解释器锁问题

1.1 什么是GIL机制

Python是一种编程语言,需要其他语言(或Python自己)实现它的解释器。目前使用最广泛的解释器是C语言实现的,所以称为CPython。例如Linuxos等操作系统通常自带的均是CPython,另外机器学习生态圈使用较多的Anaconda也是CPython解释器。

注:除了CPython,其实还有java实现的Jyphon解释器、RPython实现的Pypy解释器。

可以使用sys包中implementation方法查看解释器的类型:

1
2
3
4
> # print(sys.implementation)
> namespace(cache_tag='cpython-35', hexversion=50660336, name='cpython', version=s
> ys.version_info(major=3, minor=5, micro=3, releaselevel='final', serial=0))
>

CPython有两个缺点:

  • CPython不支持即时编译。
  • CPython编译器的GIL全局锁的机制。

GILGlobal Interpreter Lock)是一个互斥锁(mutex),保证同一时刻只有一个线程控制解释器(即任何时间点只能一个线程处于运行状态)。通俗的理解,GIL相当于线程运行在CPU上的通行证,一个Python进程(也称为Python虚拟机)同时只能容许一个线程在CPU上执行,没有通行证的线程只能挂起等待。对于单个核心的CPU,这就是正常的并发。但是对于多核心的CPUGIL仍然限制不能并行执行,这样就不能发挥最大计算资源优势。

显然,由于GIL机制,CPython多线程程序在多核心CPU架构下运行效率低下。另外需要注意的是:仅CPython解释器上存在GIL

1.2 为什么要引入GIL机制

Python中垃圾回收有个“引用计数”机制。Python登记每个对象的引用次数,当引用次数为0时候,Python将这个对象(Python中一切皆对象)从内存中删除,释放资源。可以体会下面的例子:

1
2
3
4
5
6
7
8
9
testlist = [1,2]
print(sys.getrefcount(testlist))
# 输出是:2。需要注意的是getrefcount函数的本身引用也算1次。
a = testlist
print(sys.getrefcount(testlist))
# 输出是:3
del a
print(sys.getrefcount(testlist))
# 输出是:2。删除了变量a,对象引用数减少1。

在这种机制下,如果同时存在两个线程(不通信状态)对同一个对象增加或者减少其引用值,就会存在一定概率导致内存泄漏和内存错误释放,表现为程序崩溃或出现未知异常错误。为了保证对象数据的一致性安全,于是考虑增加锁机制。

如果对每个对象添加锁机制,在多锁机制下,又会存在死锁现象。综合以上因素和权衡,Python直接对解释器整体加了一把锁,这就是GIL

1.3 GIL机制对于多线程程序的影响

CPython引入GIL机制后,到底对多线程程序运行有多大影响呢?下面分别对CPUI/O密集型场景进行分析:

  • CPU密集型代码(各种循环处理、计数、搜索、矩阵计算等)

    在这种情况下,ticks计数(计步(ticks)可粗略看作Python虚拟机的指令)很快就会达到阈值,然后触发GIL的释放与再竞争(多个线程来回切换需要消耗资源的),所以对CPU密集型代码并不友好。在python 3.2中,GIL不再使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL,阀值默认为5ms,可设置),这样对CPU密集型程序会更加友好。

  • I/O密集型代码(文件处理、网络爬虫等)

    多线程能够有效提升效率(单线程下有I/O操作会进行I/O等待,造成不必要的时间浪费,而开启多线程能在线程A I/O等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。所以对I/O密集型代码比较友好。

具体对比测试数据可以参考文章:http://cenalulu.github.io/python/gil-in-python/

1.4 为什么不根本解决GIL

Guido van Rossum 在创造Python的时候,时间是90年代(1990年代)。那时候硬件条件大多数是单核CPU。随着CPU多核硬件条件的实现,能否解决GIL机制呢?CPython的作者和BDFLGuido van Rossum,在20079月的文章It isn’t Easy to Remove the GIL给出了社区回应:

I’d welcome it if someone did another experiment along the lines of Greg’s patch (which I haven’t found online), and I’d welcome a set of patches into Py3k only ifthe performance for a single-threaded program (and for a multi-threaded but I/O-bound program) does not decrease.

此后的任何尝试都没有实现这一条件。

第二部分 Python中的多线程实现

Python通过两个标准库threadthreading提供对线程编程的支持。从Python 1.5.2版本开始支持threading模块,另外需要注意的是在Python 3thread重命名为_threadThreading模块是基于thread\_thread)基础上构建了更易用的多线程支持(threading模块对thread模块进行了封装,更便于使用),建议使用更强大的Threading模块。

thread_thread)和threading均是CPython中的内置模块。

2.1 简单例子

下面的例子是一个单线程的程序实现。多个work任务串行执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
import threading

def work():
print(threading.currentThread().getName()+" "+"start")
time.sleep(1)
print(threading.currentThread().getName() + " " + "end")

if __name__ == '__main__':
startTime = time.time()
for i in range(4):
work()
print(time.time()-startTime)
"""
MainThread start
MainThread end
MainThread start
MainThread end
MainThread start
MainThread end
MainThread start
MainThread end
4.0
"""

如果我们使用多线程编程实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
import threading

def work():
print(threading.currentThread().getName()+" "+"start")
time.sleep(1)
print(threading.currentThread().getName() + " " + "end")

if __name__ == '__main__':
startTime = time.time()
for i in range(4):
my_thread = threading.Thread(target=work)
my_thread.start()
print(time.time()-startTime)
"""
Thread-1 start
Thread-2 start
Thread-3 start
Thread-4 start
0.0009999275207519531
Thread-3 end
Thread-2 end
Thread-1 end
Thread-4 end
"""

上面的例子能看出线程并没有串行执行,在线程sleep等待时,就会顺序将print语句执行。

2.2 GIL机制下单线程和多线程的比较

下面的简单的案例,我们设计了两个CPUI/O繁忙的程序对比单线程和多线程效率的差异。使用timeit模块计时。测试机为个人笔记本多核心CPU(4核)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import threading

def CpuWork():
temp = 0
for i in range(100):
temp = temp + 1

def IoWork():
fo = open("python.text", "w+")
fo.write("testwrite\n")
fo.readlines()
fo.close()

def CpuWorkSigle():
for i in range(100):
CpuWork()

def CpuWorkTread():
for i in range(100):
t = threading.Thread(target=CpuWork())
t.start()

def IoWorkSigle():
for i in range(100):
IoWork()

def IoWorkTread():
for i in range(100):
t = threading.Thread(target=IoWork())
t.start()

if __name__ == '__main__':
from timeit import timeit

CpuWorkSigleT = timeit('CpuWorkSigle()',
'from __main__ import CpuWorkSigle',
number=10000)
CpuWorkTreadT = timeit('CpuWorkTread()',
'from __main__ import CpuWorkTread',
number=10000)
print("CpuWorkSigleT", CpuWorkSigleT)
print("CpuWorkTreadT", CpuWorkTreadT)

IoWorkSigleT = timeit('IoWorkSigle()',
'from __main__ import IoWorkSigle',
number=100)
IoWorkTreadT = timeit('IoWorkTread()',
'from __main__ import IoWorkTread',
number=100)
print("IoWorkSigleT", IoWorkSigleT)
print("IoWorkTreadT", IoWorkTreadT)
"""
CpuWorkSigleT 5.429784478290195
CpuWorkTreadT 170.5910518324614
IoWorkSigleT 24.924760018310337
IoWorkTreadT 29.044745530459664
"""

测试结果发现对于CPUI/O繁忙的程序有两个测试现象:

  • 单线程任务均比多线程任务快。
  • 对于CPU密集型任务,多线程与单线程相差较大。

就算GIL机制下不能实现并行运行,为什么多线程任务比单线程任务还要慢呢?特别是CPU密集型任务差距特别大。那是因为多线程涉及到CPU上下文切换、锁机制处理(获取锁、释放锁等)。

CPU上下文切换(context switch),也称做进程切换或者任务切换,是指CPU从一个进程或线程切换到另一个进程或线程。上下文切换对系统来说意味着消耗大量的 CPU 时间。上下文切换过高,会导致CPU像个搬运工,频繁在寄存器和运行队列之间奔波,更多时间消耗在线程切换,而不是真正工作的线程上。直接的消耗包括CPU寄存器需要保存和加载,系统调度器的代码需要执行。间接消耗在于多核cache之间的共享数据。

而对于IO密集型任务,多线程忙于单线程,但差距不大。这应该是归功于IO等待时间(当线程遇到IO操作会主动释放GIL锁),可以运行其他线程。而单线程需要严格顺序等待。

2.3 threading.RLockthreading.Lock

在介绍线程锁之前,需要介绍一个概念:线程安全。我们知道操作系统中进程是最小的资源单位,线程是最小的执行单位。同一个进程中不同线程共享资源。既然是共享,就需要管理资源状态。我们先看看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from threading import Thread

total = 0

def add_total(amount):
global total
#lock.acquire()
for _ in range(100000):
total += amount
#lock.release()

def reduce_total(amount):
global total
#lock.acquire()
for _ in range(100000):
total -= amount
#lock.release()

if __name__ == '__main__':
# lock = threading.Lock()
thread_01 = Thread(target=add_total, args=(100,))
thread_02 = Thread(target=reduce_total, args=(100,))
thread_01.start()
thread_02.start()

thread_01.join()
thread_02.join()

print(total)

例子中每次执行后,程序打印的total值是不同。如果按照朴素的理解,两个线程一个减100,一个加100,并且执行的次数相同。那么最后的结果应该是0。然而并不是这样的。

例如当reduce_total线程拿到值是0,还没减100或者没有更新。同时add_total线程同时也拿到值是0,并且快速更新值为100。这时候reduce_total线程才开始更新,会把值更新为-100。多次发生这类场景后,值就是不确定的。

为了保证数据的一致性,引入了锁的概念。threading包使用中,将上面代码中注释内容释放后,再次测试运行,结果是始终为:0。

事实上,调整后的程序,线程会严格顺序(即串行执行)。每个线程执行完才会释放锁资源。

另外还有RLock锁(可重入锁),它允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquirerelease必须成对出现,即调用了nacquire,必须调用n次的release才能真正释放所占用的琐。

两个锁的区别,我们举个栗子来比较一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading

def lockTest():
lock = threading.Lock() #Lock对象
lock.acquire()
lock.acquire() #产生了死琐
lock.release()
lock.release()

def rlockTest():
rLock = threading.RLock() #RLock对象
rLock.acquire()
rLock.acquire() #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()

if __name__ == '__main__':
# lockTest()
rlockTest()

2.4 threading.Semaphore信号量对象

信号量的概念是由计算机科学家艾兹赫尔·戴克斯特拉(Edsger W. Dijkstra)发明的,操作系统用来解决并发中的互斥和同步问题的一种方法。

信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。

Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

举一个简单的栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
import threading

semaphore = threading.Semaphore(3)
def semaphoreTest():
if semaphore.acquire():
for i in range(3):
time.sleep(1)
print(threading.currentThread().getName() + '获取锁')
semaphore.release()
print(threading.currentThread().getName() + '-释放锁')

if __name__ == '__main__':
for i in range(4):
t1 = threading.Thread(target=semaphoreTest)
t1.start()
"""
Thread-1获取锁
Thread-3获取锁
Thread-2获取锁
Thread-1获取锁
Thread-3获取锁
Thread-2获取锁
Thread-1获取锁
Thread-1-释放锁 #释放
Thread-3获取锁
Thread-3-释放锁 #释放
Thread-2获取锁
Thread-2-释放锁 #释放
Thread-4获取锁
Thread-4获取锁
Thread-4获取锁
Thread-4 释放锁
"""

线程名Thread-4 在Thread-1、Thread-2、Thread-3释放锁资源后

2.5 Event事件

2.6 threading.Condition

2.7 Quences

第三部分 Python中的多进程实现

既然Python对于多线程支持有天生的缺陷,那么如何解决呢?Python 在2.6版本后引入了multiprocessing包,提供了多进程并发的接口。这样每个进程都有自己的GIL,避免了线程之间挣抢GIL。

Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__':的方式,显然这只能用于调试和学习,不能用于实际环境。

https://anyisalin.github.io/2017/03/12/python-multithread/

https://zhuanlan.zhihu.com/p/46378282

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/03_how_to_define_a_thread.html

第四部分 附录

4.1 threading 包的使用

4.2

参考文献及资料

1、Python的GIL是什么鬼,多线程性能究竟如何,链接:http://cenalulu.github.io/python/gil-in-python/

2、It isn’t Easy to Remove the GIL,链接:https://www.artima.com/weblogs/viewpost.jsp?thread=214235

3、深入理解Linux的CPU上下文切换,链接:https://www.linuxblogs.cn/articles/linux-context-switch.html

4、Python 最难的问题,链接:https://zhuanlan.zhihu.com/p/32284710

0%