前言

 

什么是I/O?

I/O中文意思是输入/输出,英文意思是Input/Output,我们平时所说的I/O操作其是指2个动作3个步骤

步骤1:输入指令  

步骤2:等待结果返回

步骤3:结果返回之后输入

在使用Python编程时无论是多线程还是协程技术都是围绕如何充分利用步骤2的等待时间来展开。

如果I/O操作时Input和Output这两个动作就是异步的(异步I/O),那我们就可以在Python中使用 asyncio模块(ayncy+await关键字+事件循环机制)实现协程。

如果I/O是同步的,我们只能使用多线程和多进程花费更多的系统资源去实现异步效果来规避步骤2的等待时间。

还记得gevent协程模块里面的monkey_patch吗?

它就是帮我们把现有的同步I/O模块改装成异步I/O模块的。

 

什么是协程?

线程和进程并非操作系统提供的,协程则是程序员创造出来的微线程。 

协程是1种用户态的上下文切换技术,在遇到I/O操作时,协程可以在1个线程的不同代码块中来回切换。

其宗旨和线程一样是为了规避I/O操作时 等待结果返回的步骤 ,和多线程相比协程由于是单线程,它使用的资源比较小。

async def print_hello():
    while True:
        print("1")
        await asyncio.sleep(1)
        print("2")


async def print_goodbye():
    while True:
        print("3")
        await asyncio.sleep(1)
        print("4")

#async 函数返回1个协程对象
corutine1=print_hello()
corutine2=print_goodbye()
#获取事件循环
loop=asyncio.get_event_loop()
#开始监听事件
loop.run_until_complete(asyncio.gather(corutine1,corutine2))
协程示例

 

协程的缺陷?

协程依赖异步I/O和事件循环,目前大部分Python模块还不支持异步I/O。

 

使用协程的意义?

前面我们提到I/O操作包含:2个动作3个步骤。

说得具体一些应该是当协程遇到I/O操作时,执行完input动作之后,不会一直等待结果返回,不会阻塞。

而是在等待结果返回的同时,去执行其他代码块,待结果返回之后再回到原处获取output输出结果。

如果1个线程遇到I/O操作时不执行Input动作而是直接切换到其他代码块,算不上协程!

如果1个线程遇到I/O操作时执行了Input动作之后一直等待output输出结果,也算不上协程这还是同步!

如果我们不理解I/O操作的2个动作和3个步骤,就无法真正理解协程的意义到底在哪?

协程需要借助事件循环机制实现对I/O的循环检测,才能实时得知哪个代码块需要执行Input操作?哪个代码块需要执行Output操作?实现自动切换。

如果我们不借助事件循环机制就无法对I/O操作进行循环检测,也无法实现协程。

 

实现协程的方式

既然协程的1个重要特征是在1个线程的不同代码块中来回切换。

在Python中我们的代码是由上至下顺序运行的,那我们怎么改变这种代码执行顺序?实现来回切换呢?

1.greenlet 

greenlet是Python中1个第三方模块需要pip intall greenlet,大名鼎鼎的gevent模块正是基于它进行协程间的切换。

from greenlet import greenlet


def coroutine1():
    print(1)  # 第2步:输出1
    gr2.switch()  # 第3步:切换到coroutine2
    print(2)  # 第6步输出2
    gr2.switch()  # 第7步切换再切换到coroutine2


def coroutine2():
    print(3)  # 第4步:输出3
    gr1.switch()  # 第5步:切换回coroutine1
    print(4)  # 第8步:输出4


gr1 = greenlet(coroutine1)
gr2 = greenlet(coroutine2)
gr1.switch()  # 第1步:去执行coroutine1的代码

 

2.yield关键字

我们都知道通过yield关键字可以把函数编程生成器,也可以使用yield from关键字跳转到其他函数执行之后,再回到原处进行执行

def func1():
    yield 1  # 第1步:1
    yield from func2()  # 第2步:切换到func2去执行
    yield 2  # 第5步:2


def func2():
    yield 3  # 第3步:3
    yield 4  # 第4步:4


# 实现生成器
f1 = func1()

# 开始迭代生成器
for item in f1:
    print(item)
# 输出: 1  3  4  2

 

3.asyncio装饰器

我以上所述的例子其实算不上真正的协程,因为协程不仅有切换的特性,还可以在于到I/O等待的时候不被阻塞。

在Python3.4版本之后我们可以使用asyncio装饰器模式实现协程。

import asyncio


@asyncio.coroutine
def func1():
    print(1)  # 第1步:打印1
    yield from asyncio.sleep(2)  # 第2步遇到I/O等待时间了先不在这里死等了
    print(2)  # 第5步:睡醒之后去打印2


@asyncio.coroutine
def func2():
    print(3)  # 第3步:去打印3
    yield from asyncio.sleep(2)  # 第4步又遇到I/O等待时间了,先不在这里傻等了,看看func1睡醒了吗?
    print(4)  # 第6步func2也睡醒了继续打印4


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# 输出1 3 2 4 总耗时2秒

 

4.async & await关键字

Python3.5版本之后支持了async个await关键字语法实现协程,就像JavaScript/C#,这种语法比较简洁,推荐使用。

import asyncio


async def func1():
    print(1)  # 第1步:打印1
    await asyncio.sleep(2)  # 第2步遇到I/O等待时间了先不在这里死等了
    print(2)  # 第5步:睡醒之后去打印2


async def func2():
    print(3)  # 第3步:去打印3
    await asyncio.sleep(2)  # 第4步又遇到I/O等待时间了,先不在这里傻等了,看看func1睡醒了吗?
    print(4)  # 第6步func2也睡醒了继续打印4


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# 输出1 3 2 4 总耗时2秒

 

同步 VS 协程

协程的意义在于充分利用I/O操作中的等待时间,下面我们来体验一下协程。

同步方式

import requests


def download_image(url):
    print("开始下载", url)
    response = requests.get(url=url)
    print("下载完成")
    file_name = url.split("_")[-1]
    with open(file=file_name, mode="wb") as f:
        f.write(response.content)


url_list = [
    'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
    'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
    'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
for item in url_list:
    download_image(url=item)

 协程异步的方式

pip install aiohttp -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

 代码

import asyncio

import aiohttp


async def fetch(session, url):
    print("发送请求:", url)
    async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(content)


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
            'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
            'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
        ]
        tasks = [asyncio.ensure_future(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

 

基于协程进行异步编程

下面我们开始深入学习asyncio模块来实现基于协程的异步编程。

 

事件循环

事件循环就是一个while循环,去循环检测任务列表中的任务状态,等到任务列表中的任务全部为已完成时,循环终止。

任务列表 = [ 任务1, 任务2, 任务3,... ]
while True:
    可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行''已完成'的任务返回
    for 就绪任务 in 已准备就绪的任务列表:
        执行已就绪的任务
    for 已完成的任务 in 已完成的任务列表:
        在任务列表中移除 已完成的任务
    如果 任务列表 中的任务都已完成,则终止循环

 在编写程序时候可以通过如下代码来获取和创建事件循环

import asyncio
#获取操作系统的事件循环
loop = asyncio.get_event_loop()
#添加任务到事件循环的任务列表
loop.run_until_complete(asyncio.wait(任务))

 

async关键字

async关键字可以声明1个协程函数

async def func():

需要注意的是在我们调用协程函数时,协程函数中的代码不会像普通函数一样直接执行。

而是返回1个协程对象,此时我们借助事件循环执行协程对象,从而执行协程函数中的代码。

import asyncio


async def func():
    print("代码执行")


# 协程函数生成协程对象
routine_obj = func()
# 获取事件循环
loop = asyncio.get_event_loop()
# 借助事件循环执行协程对象才能执行func中的代码
loop.run_until_complete(routine_obj)

在Python3.7之后我可以直接使用async.run(协程对象)

import asyncio


async def func():
    print("代码执行")


# 协程函数生成协程对象
routine_obj = func()

# 获取事件循环
# loop = asyncio.get_event_loop()
# 借助事件循环执行协程对象才能执行func中的代码
# loop.run_until_complete(routine_obj)
# 在Python3.7之后我们可以直接使用asyncio.run()替代以上2行代码
asyncio.run()

 

await关键字

await用于等待当前代码块的执行结果,在同1个任务中,当前代码块不执行完,当前任务中的代码将不会往下执行,因为当前任务中的下一个代码块需要上1个代码块的执行结果。

在执行完了I/O操作中的Input动作之后当前代码块将被挂起,线程去执行事件循环任务列表里其他任务。

等事件循环检测到任务已完成也就是I/O操作Output有输出结果返回之后,再切换回任务执行。

协程的切换是根据事件循环检测到任务列表中不同任务的状态 进行切换的,二者配合缺一不可。

await 后面可以是 协程对象/task对象/future对象,以下我将逐一探索。

 

协程对象

协程函数()调用执行之后返回协程对象。

import asyncio


async def func():
    print("代码执行")
    resopnse=await asyncio.sleep(2)
    print("结束",resopnse)


# 协程函数生成协程对象
routine_obj = func()

# 获取事件循环
loop = asyncio.get_event_loop()
# 借助事件循环执行协程对象才能执行func中的代码,由于事件循环列表中只有1个任务没有其他任务,所以无法发挥协程的优势。
loop.run_until_complete(routine_obj)

如果在事件循环中注册了1个任务(协程),那我们在这1个任务中使用await,无疑会拖慢程序的执行速度。还是串行还不如不使用协程。

import asyncio


async def task1():
    print("task1开始")
    await asyncio.sleep(2)
    print("task1结束")
    return "task1返回值"


async def task():
    print("执行task协程函数内部代码")
    response1 = await task1()
    print("执行task协程函数内部代码获取task1de1返回值", response1)
    response2 = await task1()
    print("执行task协程函数内部代码获取task1de1返回值", response2)


# 协程函数生成协程对象
routine_obj = task()

# 获取事件循环
loop = asyncio.get_event_loop()
# 借助事件循环执行协程对象才能执行func中的代码,由于事件循环列表中只有1个任务没有其他任务,所以无法发挥协程的优势。
loop.run_until_complete(routine_obj)

 

Task对象

 以上我们得出结论事件循环列表中只有1个任务存在,程序执行顺序还是串行,甚至会拖慢执行速度。

那我们怎么给事件循环列表添加上多个任务呢?这就需要使用Task对象。

在Python3.7之后:我们可以使用asyncio.create_task(协程对象)      创建Task对象。

在Python3.7之前:我们可以使用asyncio.ensure_future(协程对象)   创建Task对象。

import asyncio


async def func():
    print("任务开始")
    await asyncio.sleep(2)
    print("任务结束")
    return "返回值"


async def main_func():
    print("main_func 开始")
    # 在Python3.7之后:我们可以使用asyncio.create_task(协程对象)创建task对象
    # Python3.7之前:我们使用asyncio.ensure_future(协程对象)创建task对象
    # 2.使用asyncio.ensure_future(协程对象)创建1个task1任务并添加到事件循环任务列表。
    task1 = asyncio.ensure_future(func())
    # 3.使用asyncio.ensure_future(协程对象)创建1个task2任务并添加到事件循环任务列表。
    task2 = asyncio.ensure_future(func())
    # 现在事件循环列表中1共3个任务:当执行这些任务时遇到I/0操作会自动切换到其他任务。
    # await只在1个协程的代码块中等待代码块执行完毕
    print("main结束")
    response1 = await task1
    response2 = await task2
    print(response1, response2)


loop = asyncio.get_event_loop()
# 1.事件循环列表中添加main_func任务
loop.run_until_complete(main_func())
# 执行了2个任务总耗时2秒

 

更加简洁的方式

我们可以使用await asyncio.wait(列表,timeout=None)API把task对象列表添加1次性全部添加到事件循环任务列表中,

import asyncio


async def func():
    print("协程开始")
    await asyncio.sleep(2)
    print("协程结束")
    return "返回值"


async def main_func():
    print("开始添加协程到事件循环列表")
    task_list = [
        asyncio.ensure_future(func()),
        asyncio.ensure_future(func())
    ]
    print("添加协程到事件循环列表完成")
    # 使用await asyncio.wait(列表,timeout=None)
    # 等待task_list中的协程们全部执行完毕
    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done, pending)


loop = asyncio.get_event_loop()
loop.run_until_complete(main_func())

更加简洁的方式

以上我们相当于在事件循环中添加了3个协程其中main_func协程专门用于添加Task任务。

import asyncio


async def func():
    print("协程开始")
    await asyncio.sleep(2)
    print("协程结束")
    return "返回值"


loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(asyncio.wait([
    asyncio.ensure_future(func()),
    asyncio.ensure_future(func())
]))
print(done, pending)

最简洁

import asyncio


async def func():
    print("协程开始")
    await asyncio.sleep(2)
    print("协程结束")
    return "返回值"


loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(asyncio.wait([
    func(),
    func()
]))
print(done, pending)

 

asyncio中的Future对象

Future对象是Task对象的父类,也就是Task对象封装了Future功能。

当我们await Task对象的执行结果时,只有获取到Task对象的执行结果代码才会往下走。那么这个过程是怎样的呢?

Future对象内部会有1个__state__字段,当future对象的state为Finshed状态时,Task返回执行结果。

如果future的状态一直没有finshed,task对象用于不会返回结果,改协程会一直阻塞。

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # # 创建一个任务(Future对象),这个任务什么都不干。
    fut = loop.create_future()
    # 等待任务最终结果(Future对象),没有结果则会一直等下去。
    await fut
asyncio.run(main())

实例2

一旦我们给future对象设置了值fut.set_result("666"),await将立即接收到执行结果。

import asyncio
async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result("666")
async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
    fut = loop.create_future()
    # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
    # 即手动设置future任务的最终结果,那么fut就可以结束了。
    await loop.create_task(set_after(fut))
    # 等待 Future对象获取 最终结果,否则一直等下去
    data = await fut
    print(data)
asyncio.run(main())

 

concurrent模块中的future对象

 在concrrent模块也有另1个future对象,让我们获取线程/进程的执行结果。

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

def task(value):
    time.sleep(2)
    print(value)
    return "123"

#创建1个线程池
threading_pool=ThreadPoolExecutor(max_workers=5)
#创建1个进程池
#process_pool=ProcessPoolExecutor(max_workers=5)

for i in range(10):
    future_obj=threading_pool.submit(task,i)
    print(future_obj)

 

我们在使用async+await关键字基于协程进行异步编程的前提是使用的Python模块得支持异步I/O功能

Python的异步I/O是未来的趋势,但目前来说Python中大部分模块暂时还不支持I/O异步,怎么办呢?

我们可以使用loop.run_in_executor(None, task, url),

    # 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
    # 第1步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
    # 第2步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
    # 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。

所以这种方式和协程相比无疑会使用更多线程资源。

但是这种await+async+线程/进程池混搭的方式却是一种实时获取多个线程执行结果的方式。

import asyncio

import time


# 这里写原来的Python代码
def task(value):
    print("线程开始:", value)
    time.sleep(2)
    print("线程结束:", value)
    return "111"


async def download_image(url):
    # 遇到IO请求,自动化切换到其他线程
    loop = asyncio.get_event_loop()
    # 如果Python模块默认不支持异步操作,所以就使用线程池来配合实现了。
    future = loop.run_in_executor(None, task, url)
    response = await future
    print('结果:', response)


if __name__ == '__main__':
    url_list = [
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
    # 创建1个协程对象列表
    tasks = [download_image(url) for url in url_list]
    loop = asyncio.get_event_loop()
    # 把协程对象列表 添加到事件循环列表
    loop.run_until_complete(asyncio.wait(tasks))

使用场景

import asyncio

import requests


async def download_image(url):
    # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
    print("开始下载:", url)
    loop = asyncio.get_event_loop()
    # requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
    future = loop.run_in_executor(None, requests.get, url)
    response = await future
    print('下载完成')
    # 图片保存到本地文件
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)


if __name__ == '__main__':
    url_list = [
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
    # 创建1个协程对象列表
    tasks = [download_image(url) for url in url_list]
    loop = asyncio.get_event_loop()
    # 把协程对象列表 添加到事件循环列表
    loop.run_until_complete(asyncio.wait(tasks))

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

参考

posted on 2021-02-08 11:18  Martin8866  阅读(413)  评论(1编辑  收藏  举报