python并发编程入门:(五)asyncio
其他内容见:
这是本系列的最后一篇文章,花了一个周末,终于可以收个尾。之前已经讲述过多线程、多进程的内容,见上文章链接。
在讲asyncio之前,先要介绍一个概念协程(Coroutine)。
协程是在用户态实现的上下文切换技术,因此操作系统不知道协程的存在。那么什么是上下文,简单来说就是栈、寄存器、命名空间等等。线程、进程就是操作系统帮你实现的上下文切换技术,你无需关心进程调度。
首先,协程不是由操作系统创建管理的,是在用户空间,由程序猿人为创造的。这与线程、进程不同。
其次,协程对标的是线程,代替多线程,准确来说是一种微线程。为什么要代替线程呢?
线程缺点:线程切换开销太大(相比进程还算小);为了同步互斥,线程要加信号量、锁机制,性能下降;
线程不够灵活,操作系统持续地所有线程共享CPU时间,不管线程是否阻塞,出现忙等现象。
因此,协程的概念被提出,当然在python之前就有这个概念,来看看它是怎么解决上面这些问题的:
相比线程切换,协程上下文切换代价更小。
协程是单线程的,不存在多线程互斥访问共享变量,不用锁、信号量等机制
协程非常灵活,当出现I/O阻塞时,就去切换任务,I/O完成再唤醒,这就是所谓的异步I/O,也就是避免了操作系统书上常说的cpu计算快,磁盘存取慢导致的速度不匹配。
简单概括,协程是为了代替线程而出现的。在用户态,实现的上下文切换技术,只能跑在单线程上。
嘶,协程只能跑在单线程,这是不是看起来很笨重,这是为了避免互斥访问共享变量做的妥协。因此协程不适合做计算密集型任务,它适合做I/O密集型任务,你看asyncio翻译过来就是异步I/O,讲的就是I/O密集任务。
那协程就只能利用单核吗?当然不是,结合多进程和协程,就既能发挥多核的威力,又能利用协程的高效。
讲完协程的概念,下面来看看python是如何实现的吧。
实现化协程对象的方法有:
- yield关键字
- yield from关键字
- async、await关键字(3.5以上,目前官方推荐的写法)
1.yield关键字
首先yield和yield from都是asyncio库的基础,我们不会拿yield来实现异步I/O,可以通过yield理解用户态下是如何切换上下文的。
协程是上下文切换技术,我们想想python有哪些技术也用到了上下文切换,没错,generator生成器!
生成器执行到yield,保存上下文,退出,再次执行,恢复上下文。所以yield关键字就想当然地用来实现协程对象。yield实现协程在《流畅的python》的第十六章中讲的非常清楚。
跟生成器很大的不同点是:yield实现的线程对象,可以通过send接受数据,发送的数据会成为yield表达式的值。在协程中可以这么写:a = yield result 或 a = yield 或 yield result
。yield右边表达式就是要返回的值,如果右边没有表达式,则返回None。send发送的数据赋值给yield等号左边变量。
举个例子:
def simple_coro2(a):
print('-> Started: a=',a)
b = yield a # 相当于是先返回a,暂停,直到next方法调用后,b赋值为send发送过来的数据,再执行到下一个yield暂停
print('-> Received: b =',b)
c = yield a+b # 先返回a+b,暂停,直到next方法调用后,c赋值为send发送过来的数据,再执行到下一个yield暂停
print('-> Received: c=',c)
my_coro2=simple_coro2(14) # 实例化一个协程对象
from inspect import getgeneratorstate # 得到状态的函数
# 在命令行输入
>>>getgeneratorstate(my_coro2) # 'GEN_CREATED'
>>>next(my_coro2) # 14
>>>getgeneratorstate(my_coro2) # 'GEN_SUSPENDED',运行next语句做预激,状态改变为暂停状态,方可进行send数据
>>>my_coro2.send(28) # 42
>>>my_coro2.send(99) # 99,并报StopIteration
>>>getgeneratorstate(my_coro2) # 'GEN_CLOSED'
用了yield的函数就是一个协程对象,它有四种状态:'GEN_CREATED'等待开始,'GEN_RUNNING'解释器正在执行,'GEN_SUSPENDED'在yield表达式处暂停,'GEN_CLOSED'执行结束。可以用inspect.getgeneratorstate调用。
注意仅在yield暂停状态才可以发送数据,因此需要预激,把'GEN_CREATED'状态转换为'GEN_SUSPENDED'。next(my_coro2)
这行代码,就是在做预激,next后,执行到yield a,等待状态。每次都需要预激,不免显得很麻烦,我们可以实现一个装饰器。
from functools import wraps
def coroutine(func):
@wraps(func)
def primer(*arg,**kwargs):
gen=func(*arg,**kwargs)
next(gen) # 预激
return gen
return primer
通过下面这张流程图再回顾一下上面代码,划分成三个阶段,各个阶段都是在yield表达式中结束,而且下个阶段从那一行代码开始。需要特别提示的是,b=yield a
被拆开成两部分:1.先执行yield右边表达式, 返回a,暂停;2.调用next后,再执行b赋值为yield(即send传递过来的参数)。
上述的例子是一个很简单的协程的例子,只有一个任务(函数)在调度,只是想借着说明yield如何实现,在真正场景中,我们很少需要这样编写代码,asyncio都帮你封装好了,当然学习多多益善。
总之yield就是暂停、恢复的流程控制工具。还有一些终止协程和异常处理没讲,详见《流程的python》。
2.yield from关键字
yield from只是多了一个from,但和yield功能却完全不同,yield是控制流程工具,那yield from(3.3引入的关键字)就是打开了一个双向通道,把外层调用方和最内层的子生成器连接起来。本质还是上下文切换,程序调度技术。
要介绍几个新的术语:
- 委派生成器:包含yield from表达式的生成器函数
- 子生成器:yield from表达式中获取的生成器。准确来说,支持遍历的迭代器都行,但主要用在生成器。
- 调用方:调用委派生成器的客户端代码
例子:
from collections import namedtuple
Result = namedtuple('Result', 'count average')
# 子生成器
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None: # 退出条件
break
total += term
count += 1
average = total/count
return Result(count, average) # return后给yield from
# 委派生成器
def grouper(results, key):
while True:
results[key] = yield from averager()
# 调用方
def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key) # 实例化委派生成器
next(group) # next
for value in values:
group.send(value) # 发送数据,其实是传给了子生成器的yield!
group.send(None) # 退出的条件,子生成器接受到None,退出for循环
report(results)
# output report
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
需要注意的是:
- 调用方send的数据,不由委派生成器接受,传递给子生成器的yield语句!同样子生成器的产出也是直接给调用方(除了return)
- 当子生成器遇到return,抛出stopIteration,委派生成器的yield from就会得到子生成器的return值,赋值给results[key]。
用一张图片来形象概括一下:
因此我们可以说yield from打开了一个双向通道,把外层调用方和最内层的子生成器连接起来。本质还是上下文切换,程序调度技术。遇到yield from,则把程序控制权给子生成器,实现了程序调度。
但是到目前为止,还无法实现异步I/O,因为yield from要等你return了,我才能继续执行,也就是出现忙等。asyncio就是多了一个功能,如果yield from遇到I/O阻塞,去执行其他任务,不忙等。
这些都是asyncio的基础,体会一下协程如何切换上下文,如何调度,如何通信。单纯从使用角度,不一定会用到这些。
3.async、await关键字
这是python3.5引入的关键字,也是现在最推荐的实现协程对象的方法。因为之前yield、yield from容易和生成器混淆,不能一眼看出这是在做协程。
在定义协程对象时,用async def 函数名
async def func():
pass
coroutine=func() # 实例化协程对象
用await代替yield from,功能一模一样,程序调度
之前讲yield from后面加子生成器,这里await扩充一下,后面跟可等待对象(协程对象、Future、Task对象),他们都可能发生I/O等待。Future、Task对象后面讲,先按下不表。
4.asyncio
asyncio是python3.4引入的库,翻译过来就是异步I/O。来看一个例子:
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)
tasks = [
asyncio.ensure_future( func1() ), # 实例化一个任务Task
asyncio.ensure_future( func2() )
]
loop = asyncio.get_event_loop() # 实例化一个事件循环,简单理解为死循环,直到全部任务完成break
loop.run_until_complete(asyncio.wait(tasks))
asyncio.sleep(2)
模拟了I/O操作,不可以用time.sleep,会把所有都sleep了。上述代码很简单,把两个Task实例加入到事件循环中。剩下就交给asyncio库。
如果yield from遇到I/O操作,就会切换其他任务,这就是之前讲的yield from起到了程序调度的作用!所以asyncio的执行顺序是未知的,是异步的!
asyncio定义了一个装饰器@asyncio.coroutine
,在async def关键字出现之前,用此装饰器来特指协程对象,本身不实现功能,不会预激协程。由于async关键字的出现,在3.8后被淘汰。
asyncio主要包括以下数据类型:
- Event loop
- Task
- Future
4.1Event loop
事件循环其实就是一个loop,并且帮你判断什么时候推出。
# 伪代码,参考https://zhuanlan.zhihu.com/p/137057192
任务列表 = [ 任务1, 任务2, 任务3,... ]
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
for 就绪任务 in 已准备就绪的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除 已完成的任务
如果 任务列表 中的任务都已完成,则终止循环
常用方法:
loop = asyncio.get_event_loop()
:创建loop.run_until_complete(future)
:运行直到future结束。如果参数是协程对象,被隐式转换为Task对象,返回 Future 的结果或引发其异常。
结合起来:
import asyncio
async def func():
print("协程内部代码")
# 实例化一个协程对象。
result = func()
loop = asyncio.get_event_loop() # 创建一个事件循环
loop.run_until_complete(result) # 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止。
# 可以用asyncio.run(result)代替上面两行,3.7引入的,本质一样
上面是一个协程对象,那自然如果遇到阻塞,Event loop就不能进行切换任务了,所以多任务下才能实现异步I/O。这就需要Task对象
4.2Task
创建Task:asyncio.ensure_future()
、asyncio.create_task()
(3.7后才引入)
例子1:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main开始")
# 将协程封装到Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)
task_list = [
asyncio.create_task(func(), name="n1"),
asyncio.create_task(func(), name="n2")
]
print("main结束")
# 当执行某协程遇到IO操作时,事件循环会自动切换其他任务。
done, pending = await asyncio.wait(task_list, timeout=None)
print(done, pending)
asyncio.run(main())
其中await后面不可以直接接task_list,需要用asyncio.wait
将列表封装为一个协程,wait的参数是由协程对象或者Task组成的集合,如果为协程对象,则会隐式转换成Task。
因此可以直接改为asyncio.wait([fun(),fun()], timeout=None)
。
返回一组tuple(done,pending),done表示已完成的task对象,pending表示未完成的task对象。
例子2:
import asyncio:
async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
response = await asyncio.sleep(2)
print("IO请求结束,结果为:", response)
coroutine_list = [func(), func()]
# coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ] 错!
# 此处不能直接 asyncio.create_task,因为此时事件循环还未创建。
# asyncio.wait会对协程进行隐式转换为Task对象
done,pending = asyncio.run( asyncio.wait(coroutine_list) )
4.3Future
Task就继承于Future,一般不用Future,还记得concurrent.futures库中也有一个Future,封装未完成的任务,当既需要使用concurrent.futures,又要使用asyncio时,我们可以利用Future进行同步,这一部分还没学完,见asyncio文档
参考:
《流畅的python》