深入理解asyncio(一)
/ / / 阅读数:20234前言
这几天看 asyncio 相关的 pycon 视频又重温了 asyncio 的官方文档,收获很多。之前 asyncio 被吐槽的一点就是文档写的不好,Python 3.7 时 asyncio 的官方文档被 Andrew Svetlov 以及 Yury Selivanov 等核心开发者重写了,新的版本我觉得已经好很多了。在这里记录一下我对 asyncio 的一些理解。
核心概念
asyncio 里面主要有 4 个需要关注的基本概念
Eventloop
Eventloop 可以说是 asyncio 应用的核心,是中央总控。Eventloop 实例提供了注册、取消和执行任务和回调的方法。
把一些异步函数 (就是任务,Task,一会就会说到) 注册到这个事件循环上,事件循环会循环执行这些函数 (但同时只能执行一个),当执行到某个函数时,如果它正在等待 I/O 返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成 I/O 后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同 (Cooperative) 运行:这就是事件循环的目标。
Coroutine
协程 (Coroutine) 本质上是一个函数,特点是在代码块中可以将执行权交给其他协程:
❯ cat coro1.py import asyncio async def a(): print('Suspending a') await asyncio.sleep(0) print('Resuming a') async def b(): print('In b') async def main(): await asyncio.gather(a(), b()) if __name__ == '__main__': asyncio.run(main()) |
这里面有 4 个重要关键点:
- 协程要用
async def
声明,Python 3.5 时的装饰器写法已经过时,我就不列出来了。 - asyncio.gather 用来并发运行任务,在这里表示协同的执行 a 和 b2 个协程
- 在协程 a 中,有一句
await asyncio.sleep (0)
,await 表示调用协程,sleep 0 并不会真的 sleep(因为时间为 0),但是却可以把控制权交出去了。 - asyncio.run 是 Python 3.7 新加的接口,要不然你得这么写:
loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() |
好了,我们先运行一下看看:
❯ python coro1.py Suspending a In b Resuming a |
看到了吧,在并发执行中,协程 a 被挂起又恢复过。
Future
接着说 Future,它代表了一个「未来」对象,异步操作结束后会把最终结果设置到这个 Future 对象上。Future 是对协程的封装,不过日常开发基本是不需要直接用这个底层 Future 类的。我在这里只是演示一下:
In : def c(): ...: print('Inner C') ...: return 12 ...: In : future = loop.run_in_executor(None, c) # 这里没用await,None 表示默认的 executor Inner C In : future # 虽然c已经执行了,但是状态还是 pending。 Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]> In : future.done() # 还没有完成 Out: False In : for a in dir(future): ...: if not a.startswith('_'): ...: print(a) ...: add_done_callback cancel cancelled done exception get_loop remove_done_callback result set_exception set_result |
可以对这个 Future 实例添加完成后的回调 (add_done_callback)、取消任务 (cancel)、设置最终结果 (set_result)、设置异常 (如果有的话,set_exception) 等。现在我们让 Future 完成:
In : await future Out: 12 In : future Out: <Future finished result=12> In : future.done() Out: True In : future.result() Out: 12 |
看到了吧,await 之后状态成了 finished。这里顺便说一下,一个对象怎么样就可以被 await(或者说怎么样就成了一个 awaitable 对象)呢?给类实现一个__await__方法,Python 版本的 Future 的实现大概如下:
def __await_(self): if not self.done(): self._asyncio_future_blocking = True yield self if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() |
这样就可以await future
了,那为什么await future
后 Future 的状态就能改变呢,这是因为用loop.run_in_executor
创建的 Future 注册了一个回调(通过asyncio.futures.wrap_future
,加了一个_call_set_state
回调,有兴趣的可以通过延伸阅读链接 2 找上下文)。
__await__
里面的yield self
不要奇怪,主要是为了兼容__iter__
,给旧的yield from
用:
In : future = loop.run_in_executor(None, c) Inner C In : future Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]> In : def spam(): ...: yield from future ...: In : s = spam() In : next(s) Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]> |
新的替代yield from
的用法 await 必须在异步函数 (用 async def 申明) 中使用:
In : def spam(): ...: await future ...: File "cell_name", line 5 SyntaxError: 'await' outside async function |
Task
Eventloop 除了支持协程,还支持注册 Future 和 Task2 种类型的对象,那为什么要存在 Future 和 Task 这 2 种类型呢?
先回忆前面的例子,Future 是协程的封装,Future 对象提供了很多任务方法 (如完成后的回调、取消、设置任务结果等等),但是开发者并不需要直接操作 Future 这种底层对象,而是用 Future 的子类 Task 协同的调度协程以实现并发。
Task 非常容易创建和使用:
# 或者用task = loop.create_task(a()) In : task = asyncio.ensure_future(a()) In : task Out: <Task pending coro=<a() running at /Users/dongwm/mp/2019-05-22/coro1.py:4>> In : task.done() Out: False In : await task Suspending a Resuming a In : task Out: <Task finished coro=<a() done, defined at /Users/dongwm/mp/2019-05-22/coro1.py:4> result=None> In : task.done() Out: True |
asyncio 并发的正确 / 错误姿势
在代码中使用 async/await 是不是就能发挥 asyncio 的并发优势么,其实是不对的,我们先看个例子:
async def a(): print('Suspending a') await asyncio.sleep(3) print('Resuming a') async def b(): print('Suspending b') await asyncio.sleep(1) print('Resuming b') async def s1(): await a() await b() |
有 2 个协程 a 和 b,分别 sleep1 秒和 3 秒,如果协程可以并发执行,那么执行时间应该是 sleep 最大的那个值 (3 秒),现在它们都在 s1 协程里面被调用。大家先猜一下 s1 会运行几秒?
我们写个小程序验证一下:
def show_perf(func): print('*' * 20) start = time.perf_counter() asyncio.run(func()) print(f'{func.__name__} Cost: {time.perf_counter() - start}') |
大家注意我这个时间计数用的方法,没有用 time.time,而是用了 Python 3.3 新增的 time.perf_counter 它是现在推荐的用法。我们在 IPython 里面验证下:
In : from coro2 import * In : show_perf(s1) ******************** Suspending a Resuming a Suspending b Resuming b s1 Cost: 4.009796932999961 |
看到了吧,4 秒!!!,相当于串行的执行了 (sleep 3 + 1)。这是错误的用法,应该怎么用呢,前面的 asyncio.gather 就可以:
async def c1(): await asyncio.gather(a(), b()) In : show_perf(c1) ******************** Suspending a Suspending b Resuming b Resuming a c1 Cost: 3.002452698999832 |
看到了吧,3 秒!另外一个是 asyncio.wait:
async def c2(): await asyncio.wait([a(), b()]) In : show_perf(c2) ... c2 Cost: 3.0066957049998564 |
同样是 3 秒。先别着急,gather 和 wait 下篇文章还会继续对比。还有一个方案就是用 asyncio.create_task:
async def c3(): task1 = asyncio.create_task(a()) task2 = asyncio.create_task(b()) await task1 await task2 async def c4(): task = asyncio.create_task(b()) await a() await task In : show_perf(c3) ... c3 Cost: 3.002332438999929 In : show_perf(c4) ... c4 Cost: 3.002270970000154 |
都是 3 秒。asyncio.create_task 相当于把协程封装成 Task。不过大家要注意一个错误的用法:
async def s2(): await asyncio.create_task(a()) await asyncio.create_task(b()) In : show_perf(s2) ... s2 Cost: 4.004671427999938 |
直接 await task 不会对并发有帮助 *。asyncio.create_task 是 Python 3.7 新增的高阶 API,是推荐的用法,其实你还可以用 asyncio.ensure_future 和 loop.create_task:
async def c5(): task = asyncio.ensure_future(b()) await a() await task async def c6(): loop = asyncio.get_event_loop() task = loop.create_task(b()) await a() await task In : show_perf(c5) ... c5 Cost: 3.0033873750003295 In : show_perf(c6) ... c6 Cost: 3.006120122000084 |
到这里,我们一共看到 2 种错误的,6 种正确的写法。你学到了么?
代码目录
本文代码可以在 mp 项目 找到
c3 中先生成 task1、task2 然后分别 await,就有并发效果,但是 s2 中二次直接 await asyncio.create_task (a ()),却没有并发效果 一直没有搞懂,这个是为什么呢?