python并发编程入门:(五)asyncio

其他内容见:

这是本系列的最后一篇文章,花了一个周末,终于可以收个尾。之前已经讲述过多线程、多进程的内容,见上文章链接。

在讲asyncio之前,先要介绍一个概念协程(Coroutine)。

协程是在用户态实现的上下文切换技术,因此操作系统不知道协程的存在。那么什么是上下文,简单来说就是栈、寄存器、命名空间等等。线程、进程就是操作系统帮你实现的上下文切换技术,你无需关心进程调度。


首先,协程不是由操作系统创建管理的,是在用户空间,由程序猿人为创造的。这与线程、进程不同。

其次,协程对标的是线程,代替多线程,准确来说是一种微线程。为什么要代替线程呢?

线程缺点:线程切换开销太大(相比进程还算小);为了同步互斥,线程要加信号量、锁机制,性能下降;
线程不够灵活,操作系统持续地所有线程共享CPU时间,不管线程是否阻塞,出现忙等现象。

因此,协程的概念被提出,当然在python之前就有这个概念,来看看它是怎么解决上面这些问题的:

相比线程切换,协程上下文切换代价更小。
协程是单线程的,不存在多线程互斥访问共享变量,不用锁、信号量等机制
协程非常灵活,当出现I/O阻塞时,就去切换任务,I/O完成再唤醒,这就是所谓的异步I/O,也就是避免了操作系统书上常说的cpu计算快,磁盘存取慢导致的速度不匹配。

简单概括,协程是为了代替线程而出现的。在用户态,实现的上下文切换技术,只能跑在单线程上。

嘶,协程只能跑在单线程,这是不是看起来很笨重,这是为了避免互斥访问共享变量做的妥协。因此协程不适合做计算密集型任务,它适合做I/O密集型任务,你看asyncio翻译过来就是异步I/O,讲的就是I/O密集任务。

那协程就只能利用单核吗?当然不是,结合多进程和协程,就既能发挥多核的威力,又能利用协程的高效。


讲完协程的概念,下面来看看python是如何实现的吧。

实现化协程对象的方法有:

  • yield关键字
  • yield from关键字
  • async、await关键字(3.5以上,目前官方推荐的写法)

1.yield关键字

首先yield和yield from都是asyncio库的基础,我们不会拿yield来实现异步I/O,可以通过yield理解用户态下是如何切换上下文的。

协程是上下文切换技术,我们想想python有哪些技术也用到了上下文切换,没错,generator生成器!

生成器执行到yield,保存上下文,退出,再次执行,恢复上下文。所以yield关键字就想当然地用来实现协程对象。yield实现协程在《流畅的python》的第十六章中讲的非常清楚。

跟生成器很大的不同点是:yield实现的线程对象,可以通过send接受数据,发送的数据会成为yield表达式的值。在协程中可以这么写:a = yield result 或 a = yield 或 yield result 。yield右边表达式就是要返回的值,如果右边没有表达式,则返回None。send发送的数据赋值给yield等号左边变量。

举个例子:

def simple_coro2(a):
    print('-> Started: a=',a)
    b = yield a # 相当于是先返回a,暂停,直到next方法调用后,b赋值为send发送过来的数据,再执行到下一个yield暂停
    print('-> Received: b =',b)
    c = yield a+b # 先返回a+b,暂停,直到next方法调用后,c赋值为send发送过来的数据,再执行到下一个yield暂停
    print('-> Received: c=',c)

my_coro2=simple_coro2(14) # 实例化一个协程对象
from inspect import getgeneratorstate # 得到状态的函数

# 在命令行输入
>>>getgeneratorstate(my_coro2)  #  'GEN_CREATED'
>>>next(my_coro2)               #  14
>>>getgeneratorstate(my_coro2)  #  'GEN_SUSPENDED',运行next语句做预激,状态改变为暂停状态,方可进行send数据
>>>my_coro2.send(28)            #  42
>>>my_coro2.send(99)            #  99,并报StopIteration
>>>getgeneratorstate(my_coro2)  #  'GEN_CLOSED'

用了yield的函数就是一个协程对象,它有四种状态:'GEN_CREATED'等待开始,'GEN_RUNNING'解释器正在执行,'GEN_SUSPENDED'在yield表达式处暂停,'GEN_CLOSED'执行结束。可以用inspect.getgeneratorstate调用。

注意仅在yield暂停状态才可以发送数据,因此需要预激,把'GEN_CREATED'状态转换为'GEN_SUSPENDED'。next(my_coro2) 这行代码,就是在做预激,next后,执行到yield a,等待状态。每次都需要预激,不免显得很麻烦,我们可以实现一个装饰器。

from functools import wraps

def coroutine(func):
    @wraps(func)
    def primer(*arg,**kwargs):
        gen=func(*arg,**kwargs)
        next(gen) # 预激
        return gen
    return primer

通过下面这张流程图再回顾一下上面代码,划分成三个阶段,各个阶段都是在yield表达式中结束,而且下个阶段从那一行代码开始。需要特别提示的是,b=yield a被拆开成两部分:1.先执行yield右边表达式, 返回a,暂停2.调用next后,再执行b赋值为yield(即send传递过来的参数)。

来自《流畅的python》

上述的例子是一个很简单的协程的例子,只有一个任务(函数)在调度,只是想借着说明yield如何实现,在真正场景中,我们很少需要这样编写代码,asyncio都帮你封装好了,当然学习多多益善。

总之yield就是暂停、恢复的流程控制工具。还有一些终止协程和异常处理没讲,详见《流程的python》。

2.yield from关键字

yield from只是多了一个from,但和yield功能却完全不同,yield是控制流程工具,那yield from(3.3引入的关键字)就是打开了一个双向通道,把外层调用方和最内层的子生成器连接起来。本质还是上下文切换,程序调度技术。

要介绍几个新的术语:

  • 委派生成器:包含yield from表达式的生成器函数
  • 子生成器:yield from表达式中获取的生成器。准确来说,支持遍历的迭代器都行,但主要用在生成器。
  • 调用方:调用委派生成器的客户端代码

例子:

from collections import namedtuple

Result = namedtuple('Result', 'count average')


# 子生成器
def averager():  
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield  
        if term is None:  # 退出条件
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)  # return后给yield from


# 委派生成器
def grouper(results, key): 
    while True:  
        results[key] = yield from averager() 


# 调用方
def main(data):  
    results = {}
    for key, values in data.items():
        group = grouper(results, key)  # 实例化委派生成器
        next(group)  # next
        for value in values:
            group.send(value)  # 发送数据,其实是传给了子生成器的yield!
        group.send(None)  # 退出的条件,子生成器接受到None,退出for循环

    report(results)


# output report
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
              result.count, group, result.average, unit))

data = {
    'girls;kg':
        [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m':
        [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg':
        [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m':
        [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

if __name__ == '__main__':
    main(data)

需要注意的是:

  • 调用方send的数据,不由委派生成器接受,传递给子生成器的yield语句!同样子生成器的产出也是直接给调用方(除了return)
  • 当子生成器遇到return,抛出stopIteration,委派生成器的yield from就会得到子生成器的return值,赋值给results[key]。

用一张图片来形象概括一下:

因此我们可以说yield from打开了一个双向通道,把外层调用方和最内层的子生成器连接起来。本质还是上下文切换,程序调度技术。遇到yield from,则把程序控制权给子生成器,实现了程序调度。

但是到目前为止,还无法实现异步I/O,因为yield from要等你return了,我才能继续执行,也就是出现忙等。asyncio就是多了一个功能,如果yield from遇到I/O阻塞,去执行其他任务,不忙等。

这些都是asyncio的基础,体会一下协程如何切换上下文,如何调度,如何通信。单纯从使用角度,不一定会用到这些。

3.async、await关键字

这是python3.5引入的关键字,也是现在最推荐的实现协程对象的方法。因为之前yield、yield from容易和生成器混淆,不能一眼看出这是在做协程。

在定义协程对象时,用async def 函数名

async def func():
    pass
coroutine=func() # 实例化协程对象

用await代替yield from,功能一模一样,程序调度

之前讲yield from后面加子生成器,这里await扩充一下,后面跟可等待对象(协程对象、Future、Task对象),他们都可能发生I/O等待。Future、Task对象后面讲,先按下不表。

4.asyncio

asyncio是python3.4引入的库,翻译过来就是异步I/O。来看一个例子:

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到IO耗时操作,自动化切换到tasks中的其他任务
    print(2)


@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
    print(4)


tasks = [
    asyncio.ensure_future( func1() ), # 实例化一个任务Task
    asyncio.ensure_future( func2() )
]

loop = asyncio.get_event_loop() # 实例化一个事件循环,简单理解为死循环,直到全部任务完成break
loop.run_until_complete(asyncio.wait(tasks)) 

asyncio.sleep(2)模拟了I/O操作,不可以用time.sleep,会把所有都sleep了。上述代码很简单,把两个Task实例加入到事件循环中。剩下就交给asyncio库。

如果yield from遇到I/O操作,就会切换其他任务,这就是之前讲的yield from起到了程序调度的作用!所以asyncio的执行顺序是未知的,是异步的!

asyncio定义了一个装饰器@asyncio.coroutine,在async def关键字出现之前,用此装饰器来特指协程对象,本身不实现功能,不会预激协程。由于async关键字的出现,在3.8后被淘汰。

asyncio主要包括以下数据类型:

  • Event loop
  • Task
  • Future

4.1Event loop

事件循环其实就是一个loop,并且帮你判断什么时候推出。

# 伪代码,参考https://zhuanlan.zhihu.com/p/137057192

任务列表 = [ 任务1, 任务2, 任务3,... ]

while True:
    可执行的任务列表已完成的任务列表 = 去任务列表中检查所有的任务'可执行''已完成'的任务返回

    for 就绪任务 in 已准备就绪的任务列表:
        执行已就绪的任务

    for 已完成的任务 in 已完成的任务列表:
        在任务列表中移除 已完成的任务

    如果 任务列表 中的任务都已完成则终止循环

常用方法:

  • loop = asyncio.get_event_loop() :创建
  • loop.run_until_complete(future) :运行直到future结束。如果参数是协程对象,被隐式转换为Task对象,返回 Future 的结果或引发其异常。

结合起来:

import asyncio

async def func():
    print("协程内部代码")

# 实例化一个协程对象。
result = func()

loop = asyncio.get_event_loop() # 创建一个事件循环
loop.run_until_complete(result) # 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止。

# 可以用asyncio.run(result)代替上面两行,3.7引入的,本质一样

上面是一个协程对象,那自然如果遇到阻塞,Event loop就不能进行切换任务了,所以多任务下才能实现异步I/O。这就需要Task对象

4.2Task

创建Task:asyncio.ensure_future()asyncio.create_task()(3.7后才引入)

例子1:

import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"


async def main():
    print("main开始")

    # 将协程封装到Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)
    task_list = [
        asyncio.create_task(func(), name="n1"),
        asyncio.create_task(func(), name="n2")
    ]

    print("main结束")

    # 当执行某协程遇到IO操作时,事件循环会自动切换其他任务。
    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done, pending)

asyncio.run(main()) 

其中await后面不可以直接接task_list,需要用asyncio.wait 将列表封装为一个协程,wait的参数是由协程对象或者Task组成的集合,如果为协程对象,则会隐式转换成Task。

因此可以直接改为asyncio.wait([fun(),fun()], timeout=None)

返回一组tuple(done,pending),done表示已完成的task对象,pending表示未完成的task对象。

例子2:

import asyncio:

async def func():
    print("执行协程函数内部代码")

    # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
    response = await asyncio.sleep(2)

    print("IO请求结束,结果为:", response)

coroutine_list = [func(), func()]
# coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]  错!
# 此处不能直接 asyncio.create_task,因为此时事件循环还未创建。

# asyncio.wait会对协程进行隐式转换为Task对象
done,pending = asyncio.run( asyncio.wait(coroutine_list) )

4.3Future

Task就继承于Future,一般不用Future,还记得concurrent.futures库中也有一个Future,封装未完成的任务,当既需要使用concurrent.futures,又要使用asyncio时,我们可以利用Future进行同步,这一部分还没学完,见asyncio文档


参考:

银角大王-武沛齐:asyncio异步编程,你搞懂了吗?

《流畅的python》

编辑于 2021-11-29 12:30