一. asyncio 模块基本操作
1.1 任务状态
上一节我们提到 asyncio 的任务使用协程对象作为参数创建。并且任务含有多种状态。下面我们使用一个简单的例子来说明任务的各种状态。
import time
import asyncio
@asyncio.coroutine
def do_some_work():
print('Coroutine start.')
time.sleep(3)
print('Coroutine finished.')
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine = do_some_work()
task = loop.create_task(coroutine) # 创建任务
print('task is instance of asyncio.Task?', 'yes' if isinstance(task, asyncio.Task) else 'No')
print(f'task state {task._state}')
loop.run_until_complete(task)
print(f'task state {task._state}')
end = time.time()
print(f'运行耗时: {end-start:.2f}')
if __name__ == '__main__':
main()
运行结果:事件循环的 create_task 方法可以创建任务,另外
asyncio.ensure_future 方法也可以创建任务,参数须为协程对象。
task 是 asyncio.Task 类的实例,创建 task 可以方便预激协程以及处理协程运行中遇到的异常。task 对象的 _state 属性保存当前任务的运行状态,任务的运行状态有 PENDING 和 FINISHED 两种。

1.2 async / await关键字
Python3.5 新增的 async 和 await 关键字可以用来定义协程函数。这两个关键字是一个组合,其作用等同于 @asyncio.coroutine 装饰器和 yield from 语句。以便将协程函数和生成器函数在语法上做出明显的区分。
import time
import asyncio
async def sleep_3s():
time.sleep(3)
async def do_some_work():
print('Coroutine start.')
# 通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果
await sleep_3s()
print('Coroutine finished.')
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine = do_some_work()
# 创建任务
task = loop.create_task(coroutine)
print('task is instance of asyncio.Task?', 'yes' if isinstance(task, asyncio.Task) else 'No')
print(f'task state {task._state}')
loop.run_until_complete(task)
print(f'task state {task._state}')
end = time.time()
print(f'运行耗时: {end - start:.2f}')
if __name__ == '__main__':
main()
1.3 绑定回调
假设协程包含一个 IO 操作(这几乎是肯定的),等它处理完数据后,我们希望得到通知,以便下一步数据处理。
这一需求可以通过向 future 对象添加回调实现。那什么是 future 对象呢?task 对象就是 future 对象,因为 asyncio.Task 是 asyncio.Future 的子类。
因此 task 对象也可以添加回调函数。回调函数的最后一个参数是 future 或 task 对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以使用 functools.partial 偏导函数传入。
import asyncio
import time
from functools import partial
async def coro_work():
print('coro_work -> Coroutine start.')
time.sleep(3)
print('coro-work -> Coroutine finished.')
def callback(name, task):
print(f'callback -> {task._state}')
print(f'callback -> {name}')
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine = coro_work()
task = loop.create_task(coroutine)
task.add_done_callback(partial(callback, 'Coroutine, Bye Bye~'))
loop.run_until_complete(task)
end = time.time()
print(f'运行耗时:{end - start:.2f}')
if __name__ == '__main__':
main()
运行结果:使用 async 关键字替代 asyncio.coroutine 装饰器创建协程函数。callback 为回调函数,协程终止后需要运行的代码写入回调函数,回调函数的参数有要求,最后一个位置参数须为 task 对象。
task 对象的 add_done_callback 方法可以添加回调函数,注意参数必须是回调函数,这个方法不能传入回调函数的参数,这一点需要通过 functools 模块的 partial 方法解决,将回调函数和其参数 name 作为 partial 方法的参数,此方法的返回值就是偏函数,偏函数可作为 task.add_done_callback 方法的参数。

二. 协程处理多任务
开始介绍 asyncio 模块到现在,我们还没有使用协程处理多任务。在实际项目中,往往有多个协程对象,并创建多个任务,同时在一个 loop 里运行。为了把多个协程交给 loop ,需要借助 asyncio.gather 方法。任务的 result 方法可以获得对应协程函数的 return 值。
import asyncio
import time
async def coro_work(name, t):
print(f'[coro_work] Coroutine {name} start.')
await asyncio.sleep(t)
print(f'[coro_work] Coroutine {name} finished.')
return f'Coroutine {name} OK.'
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine1 = coro_work('ONE', 3)
coroutine2 = coro_work('TWO', 1)
task1 = loop.create_task(coroutine1)
task2 = asyncio.ensure_future(coroutine2)
gather = asyncio.gather(task1, task2)
loop.run_until_complete(gather)
print(f'[task1 result] {task1.result()}')
print(f'[task2 result] {task2.result()}')
end = time.time()
print(f'运行耗时:{end - start:.4f}')
if __name__ == '__main__':
main()
await 关键字等同于 Python3.4 中的 yield from 语句,后面接协程对象。asyncio.sleep 方法的返回值为协程对象,此处为阻塞运行。
协程函数的 return 值在协程运行结束后通过调用对应 task 对象的 result 方法返回。
asyncio.gather 方法接收多个 task 作为参数,创建任务搜集器。
run_until_complete 方法也可接收任务搜集器作为参数,并阻塞运行,直到全部任务完成。任务结束后,事件循环终止,打印任务的 result 方法返回值,即协程函数的 return 值。
运行结果:在事件循环内部,2 个协程是交替运行完成的:首先运行 task1 ,打印 [coro_work] Coroutine ONE start.,task1 运行到 asyncio.sleep 阻塞,让步 CPU 的使用权给 task2 执行,打印 [coro_work] Coroutine TWO start.,task2 运行到 asyncio.sleep 阻塞,再次让步 CPU 的使用权,但此刻事件循环发现所有协程都处于阻塞状态,只能等待阻塞结束。
task2 的阻塞时间较短,阻塞 1s 后结束,打印 [coro_work] Coroutine TWO finished.;又过了 2s,阻塞 3s 的 task1 也结束了,打印 [coro_work] Coroutine ONE finished.。
至此,2 个任务全部完成,事件循环停止,打印 task1 和 task2 的返回值,任务总耗时约 3s,如果使用单线程同步模型则至少 4s 。

注意:
- 多数情况下无需调用
task的add_done_callback方法,可以直接把回调函数中的代码写入await语句后面,协程是可以暂停和恢复的。 - 多数情况下同样不需要调用
task的result方法获取协程函数的return值,因为事件循环的run_until_complete方法的返回值就是协程函数的返回值。 - 事件循环有一个
stop方法来停止循环和一个close方法来关闭循环。以上示例均没有调用loop.close方法,似乎并没有什么问题,那调用loop.close是否是必须的呢?
简言之,loop只要不关闭,就可以再次运行run_until_complete()方法,关闭后则不可运行。有人建议调用loop.close,以彻底清理loop对象防止误用,其实多数情况下并无必要。 -
asyncio提供了asyncio.gather和asyncio.wait两个任务搜集方法,它们的作用相同,都是将协程任务按顺序排定,再将返回值作为参数加入到事件循环中。
二者的主要区别在于:asyncio.wait可以获取任务的执行状态PENDINGFINISHED。当有一些特殊需求,比如某些情况下取消任务,可以使用asyncio.wait搜集器。
三. 取消任务
在事件循环启动之后,停止之前,我们可以手动取消任务的执行,但注意只有 PENDING 状态的任务才允许取消,FINISHED 状态的任务已经完成,自然无法取消。
import asyncio
async def work(id, t):
print('Working...')
await asyncio.sleep(t)
print(f'Work {id} done.')
def main():
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
loop.stop() # 取消所有未完成的任务,停止事件循环
finally:
loop.close() # 关闭事件循环
if __name__ == '__main__':
main()
运行结果:程序运行过程中,按 Ctrl + C 会触发 KeyboardInterrupt 异常。捕获这个异常,将取消所有未完成的任务。

除了使用事件循环的 stop 方法取消所有未完成的任务,还可以直接调用任务的 cancel 方法,而 asyncio.Task.all_tasks 方法可以获得事件循环中的全部任务。
下面,我们修改上述实例的 main() 函数代码:
import asyncio
async def work(id, t):
print('Working...')
await asyncio.sleep(t)
print(f'Work {id} done.')
def main():
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
# loop.stop() # 取消所有未完成的任务,停止事件循环
print()
tasks = asyncio.Task.all_tasks()
for task in tasks:
print(f'正在取消任务:{task}')
print(f'任务取消:{task.cancel()}')
finally:
loop.close() # 关闭事件循环
if __name__ == '__main__':
main()
运行结果: 程序运行到 work 1 done 输出时,按下 Ctrl + C 会触发 KeyboardInterrupt 异常。asyncio.Task.all_tasks() 可以捕获事件循环中的所有任务的集合,任务状态有 PENDING 和 FINISHED 两者。任务的 cancel 方法可以取消未完成的任务,取消成功返回 True ,已完成的任务由于取消失败返回 False 。











