这是Flask,Sentry的作者Armin Ronacher的一篇博客,这篇文章的影响很大,后来asyncio的文档重写就是受这篇文章影响。这篇文章写于2016.10.30。而Asyncio的一个重要的PEP525(加入了async/await语法),是2016.7.28出台的。也就是说,在PEP525之后,本文作者决定学习一下Asyncio,但是却觉得是一个大坑。

最近我详细地看了一遍Python的asyncio模块。原因是,我想要使用事件IO来做一些工作,我决定试一下Python世界最近很火的新东东。我最初感受到的是,这个asyncio系统比我预期中的要复杂的多。现在我十分确定的是,我不知道如何正确地使用它。

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

它的概念并不是很难理解,毕竟它从Twisted中借鉴了很多。但是它的很多细节,我很难搞清楚到底是什么。也许是我不够聪明,不过我还是想分享一下哪些东西让我很困惑。

原语

asyncio被设计于,通过协程来实现异步IO。最初,是通过yieldyield from表达式来实现的,不过现在它变得十分复杂。

下面是目前我必须了解的概念:

  • 事件循环(event loop)
  • 事件循环政策(event loop policy)
  • 可等待对象(awaitable)
  • 协程函数(coroutine function)
  • 旧式协程函数(old style coroutine function)
  • 协程(coroutine)
  • 协程封装器(coroutine wrapper)
  • 生成器(generator)
  • futures
  • concurrent futures
  • tasks
  • handles
  • executors
  • transports
  • protocols

除此之外,语言中还增加了下面这些特殊方法:

  • __aenter____aexit__,用来实现异步的with语句块.
  • __aiter____anext__,用来实现异步的迭代器(异步循环,和异步解析式).另外这个协议更改过。在3.5中,它返回awaitable。在3.6中,它返回异步生成器。
  • __await__,用来定义自定义awaitable。

文档中涵盖的这些知识也太多啦。不过我做了一些笔记,让一些东西可以更好理解。

事件循环(Event Loop)

asyncio中的事件循环,和你乍看之下所期望的那个事件循环有很大的不同。

表面看起来,每个线程都有一个事件循环,但是实际上它不是这么工作的。

下面是我猜想它如何工作的:

  • 如果你在主线程,那么事件循环会在你调用asyncio.get_event_loop()的时候被创建。
  • 如果你在其它线程中调用asyncio.get_event_loop(),那么会抛出一个RuntimeError。
  • 你可以在任何时候,通过asyncio.set_event_loop(),来将一个事件循环和当前的线程绑定起来。
  • 事件循环,也可以在不绑定与当前线程的时候工作。
  • asyncio.get_event_loop()返回与线程绑定的事件循环,并不是返回当前运行的那个事件循环。

这些行为组合起来,非常地让人困扰。

首先,你要知道底层的事件循环政策,这样才能明白具体的行为。默认情况下,事件循环被绑定到了线程。另外,从理论上来说,事件循环可以被绑定到greelet或者类似的东西上面。不过重要的是,库代码不能控制政策,asyncio也没有理由和线程扯上关系。

其次,asyncio并没有要求事件循环通过政策来绑定上下文。事件循环完全可以在一个隔离环境中良好地运行。这是库代码中协程,或者类似东西遇到的第一个问题,因为它们不知道由哪个事件循环来负责规划自己。这意味着,你在一个协程中调用asyncio.get_evenet_loop(),你并不知道返回的事件循环是哪个。这也是为什么所有的API都会需要一个可选的loop参数的原因。

举例来说,想要知道目前哪个协程正在运行,你不可以像直接调用Task.get_current来得到,除非你显式地传入loop:

def get_task():
    loop = asyncio.get_event_loop()
    try:
        return asyncio.Task.get_current(loop)
    except RuntimeError:
        return None

也就是说,在库代码中,你需要在任何地方都显式地传入loop,否则可能会发生非常古怪的行为。我不确定这样设计背后的考量,但是如果这里没有被修改(get_event_loop()返回当前运行的事件循环),那么就有必要在其它地方作出修改,比如要求必须传入loop参数,要求loop绑定当前上下文(比如线程)。

由于事件循环政策没有为当前上下文提供一个标志符,所以库代码可能在任何地方为当前上下文作出标识。另外,在上下文结束的时候,也没有callback可以设定。

Awaitables和Coroutines

就我个人的浅见,Python设计上的一个最大失误就是让迭代器携带了太多功能。它不仅可以用来迭代,还可以用来支持各种协程。

Python迭代器中的一个最大错误就是,如果没有捕获,StopIteration会持续冒泡。这样会在生成器或者协程终止的时候,产生很大的底层异常。Jinja开发过程中,和这个问题战斗了很久。模版引擎内部渲染原理可以看作是一个生成器,如果模版中因为某种原因出现了StopIteration,那么渲染就会结束。

Python从这个过载系统中学到的教训很少。在3.x初始版本中,asyncio还没有得到语言层面支持,所以需要使用装饰器+生成器的方式来编写协程。为了实现yield from, StopIteration会过载多次。这会导致怪异的行为:

>>> def foo(n):
...     if n in (0, 1):
...     return [1]
...     for item in range(n):
...         yield item * 2
...
>>> list(foo(0))
[]
>>> list(foo(1))
[]
>>> list(foo(2))
[0, 2]

没有错误,没有警告,但是我想结果出乎大家的意料。这是因为,在生成器函数中的return,实际上是抛出了一个StopIteration异常,并且携带一个参数值代表返回值。这个异常不会被迭代器协议抓取,只会被协程代码获取。

在3.5和3.6版本中有巨大的改变,因为现在除了生成器我们还有协程对象。可以通过在定义函数式加入前缀async来实现。例如async def x()会制造一个协程。在3.6中,异步生成器现在还会抛出AsyncStopIteration。在3.5版本,如果使用future import(generator_stop),那么如果在迭代中抛出StopIteration,它会被替换为RuntimeError

为什么我提到上面这些?因为那些旧东西未曾离开。生成器仍然有sendthrow,协程很大程度上仍然像是生成器。

为了区分那些重复之处,python引入了一些新的概念:

  • awaitable: 一个拥有__await__方法的对象。可以是原生协程,旧式协程,或者其它对象。
  • coroutinefunction: 一个返回原生协程的函数。请不要搞混淆,这不是一个返回协程的函数。
  • coroutine:原生协程。注意,在目前为止,文档中并没有把旧式的asyncio协程看作是协程。最少insepect.iscoroutine并没有把它们看作是协程。那些旧式协程,可以看作是future/awaitable这些分支。

另外特别让人困惑的是,asyncio.iscoroutinefunctioninspect.iscoroutinefunction竟然含义不同。inspect.iscoroutineinspect.iscoroutinefunction是相同的。

Coroutine Wrappers

在python看到async def的时候,它会调用一个thread local的协程封装器。它通过sys.set_coroutine_wrapper来进行调用,被封装的对象是函数。看起来像下面这样:

>>> import sys
>>> sys.set_coroutine_wrapper(lambda x: 42)
>>> async def foo():
...     pass
...
>>> foo()
__main__:1: RuntimeWarning: coroutine 'foo' was never awaited
42

在上面例子中,我没有调用开始的匿名函数,这样的示例应该可以让你看出coroutine wrapper干了什么。另外这个coroutine wrapper是thread local的,也就是说如果你调换了事件循环政策,你需要重新设定这个wrapper。新的线程也不会从父线程中继承这个。

Awaitables and Futures

一些东西是awaitable的。就目前为止,我看到下面这些都是awaitable:

  • 原生协程
  • 加入了伪造CO_ITERABLE_COROUTINE flag的生成器
  • 拥有__await__方法的对象

这些对象都有__await__方法,除了生成器因为历史原因而没有。所以CO_ITERABLE_COROUTINE这个flag是什么?它来自于coroutine wrapper(不要和sys.set_coroutine_wrapper搞混),这个wrapper是@asyncio.coroutine。这会间接地将生成器使用types.coroutine(不要和types.CoroutineType或者asyncio.coroutine混淆)来封装,它会重新创建内部的对象,并且加入一个额外的flag: CO_ITERABLE_COROUTINE.

那么什么是future呢?首先,我们要搞明白一件事:在Python3中,有两种类型的future,并且完全不兼容。包括asyncio.futures.Futureconcurrent.futures.Future。它们不是同时诞生的,但是可以同时在asyncio中使用。例如,asyncio.run_coroutine_threadsafe()会将一个协程下方到另一个线程的事件循环中,并返回一个concurrent.futures.Future,而不是一个asyncio.futures.Future对象。这讲得通,因为concurrent.futures.Future是线程安全的。

现在我们知道在asyncio有两种不兼容的future了。老实说,我不知道它们的作用,但是先可以把它们叫做“最终要发生的”。这是一个对象,最后会持有一个值,让你可以处理,但是目前这个值可能还在计算中。一些这种东西的变种叫做deferred, promises。它们之间有什么不同,老实说我也不知道。

你可以对future做什么?你可以对它加上一个callback,在future完成的时候被调用;或者加上另一个callback,在future失败的时候被调用。另外你可以对它使用await(这会实现__await__方法,所以这也是一个awaitable)。另外任何future都可以被取消。

那么你如何得到一个future呢?你可以对一个awaitable对象调用asyncio.ensure_future。这样可以把一个旧式的协程转换为future。

不过,如果你阅读了文档,你会发现asyncio.ensure_future实际返回的是一个Task。那么什么是Task呢?

Tasks

Task是一种future,它用一种特别的方式封装了一个协程。它可以像一个future一样工作,但是它还有一些额外的方法,可以用来提取协程包含的当前栈信息。我们之前提到过task,因为它有唯一一个可以用来获取当前事件循环的方法,也就是Task.get_current

另外,future和task取消的方式也有不同,但是这里不再提。如果你在编写一个协程的时候,你想要知道这个协程何时在运行,你可以通过Task.get_current来知道,不过你需要另外知道你分派的事件循环绑定在哪个线程。

不太可能知道哪个协程由哪个事件循环来运行。Task也没有提供公共API来提供这个功能。不过,如果你能过处理一个task,那么你可以通过task._loop这个属性来访问到事件循环。

Handles

Handles是一个难懂的对象,是一个用来处理待执行,不可await,但是可以取消的对象。

详细来讲,如果你通过call_soon或者call_soon_threadsafe等来规划执行,你就获得一个handle,你可以用来取消执行,但是不可以用它来等待执行完成。

Executors

你如何通知其他的线程来完成一些事情呢?你不可以在另一个线程中为当前的事件循环规划回调函数,然后获得结果。所以你需要executors。

Executors来自于concurrent.futures,它允许你将非事件型的工作交给线程完成。比如,如果你在一个事件循环中使用run_in_executor来规划一个函数。结果会以asyncio协程的方式来返回,而不是像run_coroutine_threadsafe一样返回concurrent协程。我没有足够的心力来理解为什么存在这些API,不知道何时使用哪个API。文档中建议,executor可以用来执行多进程的事情。

Transport and Protocols

这些东西基本拷贝自twisted,如果你需要理解它们,就去阅读文档吧。

如何使用Asyncio

现在我们粗略的理解了asyncio,另外我找到一些人们编写asyncio代码的常见模式:

  • 将loop传入所有的协程。社区中相当一部分的人都是这么做的。让协程知道自己被哪个loop来规划,让协程可以做类似task的事情。
  • 另外,你可以要求loop绑定线程。理想情况下这是一个好办法,不过可惜社区存在割裂。
  • 如果你想要使用上下文数据(类似thread local),现在没有什么好办法。最受欢迎的实现方式是第三方库aiolocals,但是它需要你手动将信息传播,因为解释器现在还不支持。
  • 忘记Python中存在的旧式协程。请使用Python3.5以上版本,比只使用async/await关键字。使用新的协程,可以使用异步上下文管理器,这对于资源管理来说相当有用。
  • 学会重启loop来清理。这里我花了很长时间才明白,它不是我意料之中的方式,但是是现在最有用的方法,定时地将loop重启,可以清除那些遗留下来没有执行的协程。
  • 使用subprocess的方式不清晰。你需要有一个loop运行在主线程(我认为是用来监听signal事件的),然后把subprocess分派给其他的loop。用如下的方式asyncio.get_child_watcher().attach_loop(...).
  • 想要同时编写异步和同步代码,注定是要失败的。另外如果要对对象同时支持withasync with也是很危险的。
  • 如果你想要给一个协程设置名称,用来在调试的时候知道为什么它没有被await。设置__name__是没有用的,你需要使用__qualname__
  • 有时候内部类型转换会让你发疯。

上下文数据

除了异常的复杂度,我思考使用asycio编写好的API,还缺少一个东西,就是context local数据。这个东西已经被node社区学会了。

有一个continuation-local-storage已经被接受,但是实现地太晚了。

令人失望的是,在python中目前还没有任何store可以用。我一直在关注,因为我一直想要使用asyncio来支持Sentry的breadcrumbs,但是还没有看到好的办法。asyncio中没有context的概念,因为如果不使用monkeypatch,从代码中看不出你使用的是哪个loop,也就不能获取信息。

Node目前一直在想要为这个问题找到一个长期的处理方法。这个问题对于任何生态都是不可忽略的。这个问题叫做named async context propagation,解决方式有各种名字。在Go中,需要使用context包,并且显示地传入所有的goroutine中(不是一个很好的方式,但是最少也提供了解决方案)。.NET对于local context有着最佳解决方案。它可以是一个线程上下文,一个web请求上下文,或者类似的东西,它们都会自动向上传播除非你抑制它。微软为了解决这个问题,我相信已经花了15年的时间。

我不知道asyncio生态是否足够年轻,可以从逻辑上让context加入,但是我认为应该现在开始做。

个人想法

asycnio已经很复杂,并且会变得更加复杂。我没有足够的心智能力来使用asyncio做日常工作。理解它需要不断地知道语言改动,并且它对语言带来了巨大的复杂性。也许它还需要数年时间,才可以带来享受并且稳定的开发体验。

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄