Nemo
Nemo
笔记-Python中asyncio与异步编程
笔记-Python中asyncio与异步编程

从线程到协程

多进程与多线程

  • 所谓多进程,就是系统可以同时运行多个任务
  • 在操作系统中,每个任务就是一个进程。每个进程至少做一件事,多数进程会做很多事;
  • 在一个进程中,就有很多线程,每个线程做一件事,在一个进程中有多个线程运行就是多线程。
  • 一个 CPU 在某一时刻只能做一项任务,即在一个进程(或线程)中工作,当它闲置时,会被系统派到其它进程中,进行进程 、线程切换是由操作系统决定的,无法人为干预

线程安全

  • 原子操作:将两步操作设计成一个事务,事务里可以有多个步骤,其中任何一步出现问题,事务都将失败,前面的步骤全部回滚,就像什么事都没发生。这种操作就叫做原子操作,这种特性就叫做原子性

  • 在 Python 多线程中,变量是共享的,这也是相较多进程的一个优点,线程占用资源要少得多,但也导致多个 CPU 同时操作多个线程时会引起结果无法预测的问题,也就是说 Python 的线程不安全。

GIL 全局解释器锁

  • CPython 解释器使用加锁的方法解决线程安全问题,每个进程有一把锁,启动线程先加锁,结束线程释放锁。

    打个比方,进程是一个厂房,厂房大门是开着的,门内有锁,工人进入大门后可以在内部上锁。厂房里面有 10 个车间对应 10 个线程,每个 CPU 就是一个工人。GIL(Global Interpreter Lock)全局锁就相当于厂房规定:工人要到车间工作,从厂房大门进去后要在里面反锁,完成工作后开锁出门,下一个工人再进门上锁。也就是说,任意时刻厂房里只能有一个工人,但这样就保证了工作的安全性,这就是 GIL 的原理。

  • GIL的优点包括简化 CPython 解释器和大量扩展的实现,缺点是GIL 实现了线程操作的安全性,但多线程的效率被大打折扣,GIL 不是语言特性,而是解释器的设计特点

多线程提高工作效率

接着上面的例子说,一个工厂里同一时刻只能有一个工人在工作,如果这个工厂里各个车间的自动化程度极高且任务耦合度极低,工人进去只是按几下按钮,就可以等待机器完成其余工作,那情况就不一样了,这种场景下一个工人可以管理好多个车间。

比如爬虫程序爬取页面数据这个场景中,CPU 做的事就是发起页面请求和处理响应数据,这两步是极快的,中间网络传输数据的过程是耗时且不占用 CPU 的。一个工人可以在吃完早饭后一分钟内快速到 1000 个车间按下发起请求的按钮,吃完午饭睡一觉,日薄西山时差不多收到网络传回的数据,又用一分钟处理数据,整个程序完成。

  • 复杂程序的分类:CPU 密集型IO 密集型

使用 time.sleep 方法模拟 IO 操作来写一段程序证明多线程可以提高程序的运行效率:

import threading
import time

def crawl_url():        # 假设这是爬虫程序,爬取一个 URL
    time.sleep(0.02)    # 模拟 IO 操作

def main1():            # 单线程程序
    for i in range(100):
        crawl_url()

def main2():            # 多线程程序
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=crawl_url)
        t.start()
        thread_list.append(t)
    for t in thread_list:
        t.join()

if __name__ == '__main__':
    start = time.time()
    main1()
    end = time.time()
    print('单线程耗时:{:.4f}s'.format(end - start))
    start = time.time()
    main2()
    end = time.time()
    print('多线程耗时:{:.4f}s'.format(end - start))

程序运行结果:

单线程耗时:2.0769s
多线程耗时:0.0626s

理论上,main1 的耗时是 main2 的 100 倍,考虑到 main2 创建多线程、线程切换的开销,这个结果也是相当可观的,IO 操作耗时越长,多线程的威力越大。

异步和同步,阻塞和非阻塞

  • 前一个完成后才能运行下一个,这就是同步的概念

上文的模拟爬虫示例代码中,main1 中的 for 循环运行 100 次爬取网页的操作就是同步操作

  • 阻塞操作,线程无法向下执行

在 crawl_url 函数内部的 IO 操作为阻塞操作

  • 非阻塞操作,不会等一个线程运行完成才创建下一个线程

main2 中的第一个 for 循环,创建 100 个线程并启动,这步操作是非阻塞的,它会一气儿创建 100 个线程;第二个 for 循环将主线程挂起,直到全部子线程完成,此时的主线程就是阻塞的。

  • 所谓的异步,就是 CPU 在当前线程阻塞时可以去其它线程中工作

main2 中的第一个 for 循环,创建 100 个线程并启动,这步操作是非阻塞的,不会等一个线程运行完成才创建下一个线程,它会一气儿创建 100 个线程;第二个 for 循环将主线程挂起,直到全部子线程完成,此时的主线程就是阻塞的。这种程序运行方式叫做异步

不管怎么设计,在一个线程内部代码都是顺序执行的,遇到 IO 都得阻塞,所谓的非阻塞,是遇到当前线程阻塞时,CPU 去其它线程工作。

协程初步

  • 协程是在线程的基础上编写由程序员决定代码执行顺序、可以互相影响的高耦合度代码一种高级程序设计模式

  • 一个线程内部可以有多个协程,相当于一个车间内部有多个子任务,一个协程遇到 IO 阻塞,CPU 会自动去另一个协程中工作,而且去哪里工作由程序自己说了算。

生成器原理

  • 生成器可谓协程的立身之基。

谈生成器。这就要提到经典的斐波那契数列:

In [163]: def fibonacci(n):
     ...:     a, b = 0, 1
     ...:     while b < n:
     ...:         a, b = b, a + b
     ...:         yield a
     ...:

In [164]: f = fibonacci(100)

In [165]: f
Out[165]: <generator object fibonacci at 0x112be2b88>

In [166]: for i in f:
     ...:     print(i)
     ...:
1
1
2
3
5
8
13
21
34
55
89
  • 函数体内部有 yield 关键字的都是生成器函数。

fibonacci 是生成器函数。

  • yield 关键字只能出现在函数中,生成器函数的执行结果是生成器。

注意这里所讲的 “执行结果” 不是函数的 return 值。生成器终止时必定抛出 StopIteration 异常,for 循环可以捕获此异常,异常的 value 属性值为生成器函数的 return 值。

  • 生成器还可以使用 next 方法迭代。生成器会在 yield 语句处暂停,这是至关重要的,未来协程中的 IO 阻塞就出现在这里。

生成器进化成协程

  • 生成器是由迭代器进化而来,所以生成器对象有 __iter____next__ 方法。

生成器可以使用 for 循环获得值,注意这里所说的 “获得值” 指的是下文代码块里 yield 语句中 yield 关键字后面的 i 。

  • 生成器对象有sendthrowclose 方法。这三个方法的作用分别是发送数据给生成器并赋值给 yield 语句向生成器中抛入异常由生成器内部处理终止生成器。这三个方法使得生成器进化成协程。

  • 协程有四种存在状态:

    • GEN_CREATED 创建完成,等待执行
    • GEN_RUNNING 解释器正在执行(这个状态在下面的示例程序中无法看到)
    • GEN_SUSPENDED 在 yield 表达式处暂停
    • GEN_CLOSE 执行结束,生成器停止

可以使用 inspect.getgeneratorstate 方法查看协程的当前状态,举例如下:

In [202]: import inspect

In [203]: def generator():
     ...:     i = '激活生成器'
     ...:     while True:
     ...:         try:
     ...:             value = yield i
     ...:         except ValueError:
     ...:             print('OVER')
     ...:         i = value
     ...:

In [204]: g = generator()  # 1

In [205]: inspect.getgeneratorstate(g)  # 2
Out[205]: 'GEN_CREATED'

In [206]: next(g)  # 3
Out[206]: '激活生成器'

In [207]: inspect.getgeneratorstate(g)
Out[207]: 'GEN_SUSPENDED'

In [208]: g.send('Hello Shiyanlou')  # 4
Out[208]: 'Hello Shiyanlou'

In [209]: g.throw(ValueError)  # 5
OVER
Out[209]: 'Hello Shiyanlou'

In [210]: g.close()  # 6

In [211]: inspect.getgeneratorstate(g)
Out[211]: 'GEN_CLOSED'

In [212]:

代码说明如下:

1、创建生成器

2、查看生成器状态

3、这步操作叫做预激生成器(或协程),这是必须做的。在生成器创建完成后,需要将其第一次运行到 yield 语句处暂停

4、暂停状态的生成器可以使用 send 方法发送数据,此方法的参数就是 yield 表达式的值,也就是 yield 表达式等号前面的 value 变量的值变成 ‘Hello Shiyanlou’,继续向下执行完一次 while 循环,变量 i 被赋值,继续运行下一次循环,yield 表达式弹出变量 i

5、向生成器抛入异常,异常会被 try except 捕获,作进一步处理

6、close 方法终止生成器,异常不会被抛出

因为生成器的调用方也就是程序员自己可以控制生成器的启动、暂停、终止,而且可以向生成器内部传入数据,所以这种生成器又叫做协程,generator 函数既可以叫做生成器函数,也可以叫协程函数,这是生成器向协程的过渡阶段。

预激协程

  • 预先激活生成器(或协程)可以使用 next 方法,也可以使用生成器的 send 方法发送 None 值:g.send(None)

为简化协程的使用,我们可以尝试编写一个装饰器来预激协程,这样创建的协程会立即进入 GEN_SUSPENDED 状态,可以直接使用 send 方法:

In [212]: from functools import wraps

In [213]: def coroutine(func):  # 预激协程装饰器
     ...:     @wraps(func)      # wraps 装饰器保证 func 函数的签名不被修改
     ...:     def wrapper(*args, **kw):
     ...:         g = func(*args, **kw)
     ...:         next(g)       # 预激协程
     ...:         return g      # 返回激活后的协程
     ...:     return wrapper
     ...:

In [214]: @coroutine            # 使用装饰器重新创建协程函数
     ...: def generator():
     ...:     i = '激活生成器'
     ...:     while True:
     ...:         try:
     ...:             value = yield i
     ...:         except ValueError:
     ...:             print('OVER')
     ...:         i = value
     ...:

In [215]: g = generator()

In [216]: inspect.getgeneratorstate(g)
Out[216]: 'GEN_SUSPENDED'

协程的返回值

  • StopIteration 异常的 value 属性值为生成器(协程)函数的 return 值。

我们可以在使用协程时捕获异常并得到这个值。举例如下:

In [217]: @coroutine
     ...: def generator():
     ...:     l = []                    # 1
     ...:     while True:
     ...:         value = yield         # 2
     ...:         if value == 'CLOSE':  # 3
     ...:             break
     ...:         l.append(value)       # 4
     ...:     return l                  # 5
     ...:

In [218]: g = generator()

In [219]: g.send('hello')

In [220]: g.send('nemo')

In [221]: g.send('CLOSE')
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-221-863c90462435> in <module>
----> 1 g.send('CLOSE')

StopIteration: ['hello', 'nemo']

In [222]:

代码说明如下:

1、创建列表,保存协程 send 方法每次发送的参数

2、yield 表达式不弹出值,仅作暂停之用

3、如果 send 方法的参数为 CLOSE ,break 终止 while 循环,停止生成器,抛出 StopIteration 异常

4、将 value 添加到列表

5、设置协程函数的返回值,该值在协程终止抛出 StopIteration 异常时赋值给 value 属性

可以这样捕获异常:

In [231]: g = generator()

In [232]: for i in ('hello', 'nemo', 'CLOSE'):
     ...:     try:
     ...:         g.send(i)
     ...:     except StopIteration as e:
     ...:         value = e.value  # e 的 value 属性值就是协程函数的 return 值
     ...:         print('END')
     ...:
END

In [233]: value
Out[233]: ['hello', 'nemo']

yield from

  • 相比 yield ,该语法有两大优势:
    • 避免嵌套循环:

我们知道 Python 内置模块 itertools 是十分强大的,里面有很多实用的方法,其中之一是 chain 方法,它可以接受任意数量的可迭代对象作为参数,返回一个可以包含所有参数中的元素的迭代器:

In [8]: from itertools import chain

In [9]: c = chain({'one', 'two'}, list('ace'))

In [10]: c
Out[10]: <itertools.chain at 0x1066020f0>

In [11]: for i in c:
    ...:     print(i)
    ...:
one
two
a
c
e

使用 yield 关键字实现 chain 方法:

In [16]: def chain(*args):
    ...:     for iter_obj in args:
    ...:         for i in iter_obj: #嵌套循环出现
    ...:             yield i # 注意这里 chain 函数的返回值是生成器
    ...:

In [17]: c = chain({'one', 'two'}, list('ace'))

In [18]: c
Out[18]: <generator object chain at 0x1066ff570>

In [19]: for i in c:
    ...:     print(i)
    ...:
one
two
a
c
e

现在我们使用 Python 3.3 新增的 yield from 语法优化上文的 chain 函数:

In [20]: def chain(*args):
    ...:     for iter_obj in args:
    ...:         yield from iter_obj
    ...:

In [21]: c = chain({'one', 'two'}, list('ace'))

In [22]: c
Out[22]: <generator object chain at 0x106a95b88>

In [23]: for i in c:
    ...:     print(i)
    ...:
one
two
a
c
e

可以看到 yield from 语句可以替代 for 循环,避免了嵌套循环。同 yield 一样,yield from 语句也只能出现在函数体内部有 yield from 语句的函数叫做协程函数或生成器函数

yield from 后面接收一个可迭代对象,例如上面代码中的 iter_obj 变量,在协程中,可迭代对象往往是协程对象,这样就形成了嵌套协程

转移控制权

  • 转移控制权是 yield from 语法的核心功能,也是从生成器进化到协程的最重要一步。

下面举例说明转移控制权的功能,将以下代码写入 transfer_control.py 文件中,这是一个可以将列表进行排序的程序。对代码的注释写入每行代码前面或后面,方便阅读:

import time
from faker import Faker
from functools import wraps

# 预激协程装饰器
def coroutine(func):
    @wraps(func)
    def wrapper(*args, **kw):
        g = func(*args, **kw)
        next(g)
        return g
    return wrapper

# 子生成器函数,这个生成器是真正做事的生成器
def sub_coro():
    l = []                      # 创建空列表
    while True:                 # 无限循环
        value = yield           # 调用方使用 send 方法发生数据并赋值给 value 变量
        if value == 'CLOSE':    # 如果调用方发生的数据是 CLOSE ,终止循环
            break
        l.append(value)         # 向列表添加数据
    return sorted(l)            # 返回排序后的列表

# 使用预激协程装饰器
# 带有 yield from 语句的父生成器函数
@coroutine
def dele_coro():
    # while True 可以多次循环,每次循环会创建一个新的子生成器 sub_coro()
    # 这里 while 只循环一次,这是由调用方,也就是 main 函数决定的
    # while 循环可以捕获函数本身创建的父生成器终止时触发的 StopIteration 异常
    while True:
        # yield from 会自动预激子生成器 sub_coro()
        # 所以 sub_coro 在定义时不可以使用预激协程装饰器
        # yield from 将捕获子生成器终止时触发的 StopIteration 异常
        # 并将异常的 value 属性值赋值给等号前面的变量 l
        # 也就是 l 变量的值等于 sub_coro 函数的 return 值
        # yield from 还实现了一个重要功能
        # 就是父生成器的 send 方法将发送值给子生成器
        # 并赋值给子生成器中 yield 语句等号前面的变量 value
        l = yield from sub_coro()
        print('排序后的列表:', l)
        print('------------------')

# 调用父生成器的函数,也叫调用方
def main():
    # 生成随机国家代号的方法
    fake = Faker().country_code
    # 嵌套列表,每个子列表中有三个随机国家代号(字符串)
    nest_country_list = [[fake() for i in range(3)] for j in range(3)]
    for country_list in nest_country_list:
        print('国家代号列表:', country_list)
        c = dele_coro()      # 创建父生成器
        for country in country_list:
            c.send(country)  # 父生成器的 send 方法将国家代号发送给子生成器
        # CLOSE 将终止子生成器中的 while 循环
        # 子生成器的 return 值赋值给父生成器 yield from 语句中等号前面的变量 l
        c.send('CLOSE')

if __name__ == '__main__':
    main()

运行结果如下:

$ python3 transfer_control.py
国家代号列表: ['MH', 'EC', 'MD']
排序后的列表: ['EC', 'MD', 'MH']
------------------
国家代号列表: ['ML', 'GA', 'EC']
排序后的列表: ['EC', 'GA', 'ML']
------------------
国家代号列表: ['ER', 'BW', 'CO']
排序后的列表: ['BW', 'CO', 'ER']
------------------

所谓 “转移控制权” 就是 yield from 语法可以将子生成器的控制权交给调用方 main 函数,在 main 函数内部创建父生成器 c ,控制 c.send 方法传值给子生成器。这是一个巨大的进步,在此基础上,Python 3.4 新增了创建协程的装饰器,这样非生成器函数的协程函数就正式出现了。

第一节小结

  • 线程的工作原理,为什么线程可以提高程序的执行效率
  • 协程的由来,相比线程有什么优势
  • 从生成器到协程,Python 2.5 出现的特性
  • 协程的预激和协程函数的返回值
  • yield from 语法

asyncio 模块

协程装饰器

  • Python 3.4 中使用 asyncio.coroutine 装饰器的函数就是真正的协程函数了。

任务和事件循环

coroutine 协程

  • 使用 asyncio.coroutine 装饰器装饰的函数被称作协程函数,它的调用不会立即执行函数,而是返回一个协程对象。
  • 协程对象需要包装成任务注入到事件循环,由事件循环调用。

task 任务

  • 将协程对象作为参数创建任务,任务是对协程进一步封装,其中包含任务的各种状态。

event_loop 事件循环

  • 事件循环能够控制任务运行流程,也就是任务的调用方。
In [50]: import time

In [51]: import asyncio

In [52]: def one():
    ...:     start = time.time()
    ...:
    ...:     @asyncio.coroutine   # 1
    ...:     def do_some_work():  # 2
    ...:         print('start coroutine')
    ...:         time.sleep(0.1)  # 3
    ...:         print('This is a coroutine')
    ...:
    ...:     loop = asyncio.get_event_loop()     # 4
    ...:     coroutine = do_some_work()          # 5
    ...:     loop.run_until_complete(coroutine)  # 6
    ...:
    ...:     end = time.time()
    ...:     print('运行耗时:{:.4f}'.format(end - start))  # 7
    ...:

In [53]: one()
start coroutine
This is a coroutine
运行耗时:0.1062

代码说明:

1、使用协程装饰器创建协程函数

2、协程函数

3、模拟 IO 操作

4、创建事件循环。每个线程中只能有一个事件循环,get_event_loop 方法会获取当前已经存在的事件循环,如果当前线程中没有,新建一个

5、协程对象

6、将协程对象注入到事件循环,协程的运行由事件循环控制。事件循环的 run_until_complete 方法会阻塞运行,直到任务全部完成。协程对象作为 run_until_complete 方法的参数,loop 会自动将协程对象包装成任务来运行。后面我们会讲到多个任务注入事件循环的情况

7、打印程序运行耗时

任务状态

  • 协程对象不能直接运行,必须放入事件循环中或者由 yield from 语句调用。

  • 将协程对象注入事件循环的时候,其实是 run_until_complete 方法将协程包装成了一个任务(task)对象,任务对象保存了协程运行后的状态,用于未来获取协程的结果。

    In [56]: def two():
      ...:     start = time.time()
      ...:
      ...:     @asyncio.coroutine
      ...:     def do_some_work():
      ...:         print('start coroutine')
      ...:         time.sleep(0.1)
      ...:         print('This is a coroutine')
      ...:
      ...:     loop = asyncio.get_event_loop()
      ...:     coroutine = do_some_work()
      ...:     task = loop.create_task(coroutine)  # 1
      ...:     print('task 是不是 asyncio.Task 的实例?', isinstance(task, asyncio.Task))  # 2
      ...:     print('Task state:', task._state)   # 3
      ...:     loop.run_until_complete(task)       # 4
      ...:     print('Task state:', task._state)
      ...:
      ...:     end = time.time()
      ...:     print('运行耗时:{:.4f}'.format(end - start))
      ...:
    
    In [57]: two()
    task 是不是 asyncio.Task 的实例? True
    Task state: PENDING
    start coroutine
    This is a coroutine
    Task state: FINISHED
    运行耗时:0.1052
    
    

    代码说明:

    1、事件循环的 create_task 方法可以创建任务,另外 asyncio.ensure_future 方法也可以创建任务,参数须为协程对象

    2、task 是 asyncio.Task 类的实例,为什么要使用协程对象创建任务?因为在这个过程中 asyncio.Task 做了一些工作,包括预激协程、协程运行中遇到某些异常时的处理

    3、task 对象的 _state 属性保存当前任务的运行状态,任务的运行状态有 PENDING 和 FINISHED 两种

    4、将任务注入事件循环,阻塞运行

async / await 关键字

  • Python 3.5 中新增了 async / await 关键字用来定义协程函数。
  • 这两个关键字是一个组合,其作用等同于 asyncio.coroutine 装饰器和 yield from 语句。此后协程与生成器就彻底泾渭分明了。

绑定回调

假如协程包含一个 IO 操作(这几乎是肯定的),等它处理完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往 future 对象中添加回调来实现。

  • task对象就是future对象
  • 因为 asyncio.Task 是 asyncio.Future 的子类。也就是说,task 对象可以添加回调函数。回调函数的最后一个参数是 future 或 task 对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以通过偏函数导入。

简言之,一个任务完成后需要捎带运行的代码可以放到回调函数中。修改上一个程序如下:

In [64]: def three():
    ...:     start = time.time()
    ...:
    ...:     # @asyncio.coroutine
    ...:     async def corowork():      # 1
    ...:         print('[corowork] Start coroutine')
    ...:         time.sleep(0.1)
    ...:         print('[corowork] This is a coroutine')
    ...:
    ...:     def callback(name, task):  # 2
    ...:         print('[callback] Hello {}'.format(name))
    ...:         print('[callback] coroutine state: {}'.format(task._state))
    ...:
    ...:     loop = asyncio.get_event_loop()
    ...:     coroutine = corowork()
    ...:     task = loop.create_task(coroutine)
    ...:     task.add_done_callback(functools.partial(callback, 'Shiyanlou'))  # 3
    ...:     loop.run_until_complete(task)
    ...:
    ...:     end = time.time()
    ...:     print('运行耗时:{:.4f}'.format(end - start))
    ...:

In [65]: import functools

In [66]: three()
[corowork] Start coroutine
[corowork] This is a coroutine
[callback] Hello Shiyanlou
[callback] coroutine state: FINISHED
运行耗时:0.1051

代码说明:

1、使用 async 关键字替代 asyncio.coroutine 装饰器创建协程函数

2、回调函数,协程终止后需要顺便运行的代码写入这里,回调函数的参数有要求,最后一个位置参数须为 task 对象

3、task 对象的 add_done_callback 方法可以添加回调函数,注意参数必须是回调函数,这个方法不能传入回调函数的参数,这一点需要通过 functools 模块的 partial 方法解决,将回调函数和其参数 name 作为 partial 方法的参数,此方法的返回值就是偏函数,偏函数可作为 task.add_done_callback 方法的参数

多任务

实际项目中,往往有多个协程创建多个任务对象,同时在一个 loop 里运行。

  • 为了把多个协程交给 loop,需要借助 asyncio.gather 方法。任务的 result 方法可以获得对应的协程函数的 return 值。

修改上文的程序如下:

In [67]: def four():
    ...:     start = time.time()
    ...:
    ...:     async def corowork(name, t):
    ...:         print('[corowork] Start coroutine', name)
    ...:         await asyncio.sleep(t)                  # 1
    ...:         print('[corowork] Stop coroutine', name)
    ...:         return 'Coroutine {} OK'.format(name)   # 2
    ...:
    ...:     loop = asyncio.get_event_loop()
    ...:     coroutine1 = corowork('ONE', 3)             # 3
    ...:     coroutine2 = corowork('TWO', 1)             # 3
    ...:     task1 = loop.create_task(coroutine1)        # 4
    ...:     task2 = loop.create_task(coroutine2)        # 4
    ...:     gather = asyncio.gather(task1, task2)       # 5
    ...:     loop.run_until_complete(gather)             # 6
    ...:     print('[task1] ', task1.result())           # 7
    ...:     print('[task2] ', task2.result())           # 7
    ...:
    ...:     end = time.time()
    ...:     print('运行耗时:{:.4f}'.format(end - start))
    ...:

In [68]: four()
[corowork] Start coroutine ONE
[corowork] Start coroutine TWO
[corowork] Stop coroutine TWO
[corowork] Stop coroutine ONE
[task1]  Coroutine ONE OK
[task2]  Coroutine TWO OK
运行耗时:3.0070

代码说明:

1、await 关键字等同于 Python 3.4 中的 yield from 语句,后面接协程对象。asyncio.sleep 方法的返回值为协程对象,这一步为阻塞运行。asyncio.sleep 与 time.sleep 是不同的,前者阻塞当前协程,即 corowork 函数的运行,而 time.sleep 会阻塞整个线程,所以这里必须用前者,阻塞当前协程,CPU 可以在线程内的其它协程中执行

2、协程函数的 return 值可以在协程运行结束后保存到对应的 task 对象的 result 方法中

3、创建两个协程对象,在协程内部分别阻塞 3 秒和 1 秒

4、创建两个任务对象

5、将任务对象作为参数,asyncio.gather 方法创建任务收集器。注意,asyncio.gather 方法中参数的顺序决定了协程的运行顺序

6、将任务收集器作为参数传入事件循环的 run_until_complete 方法,阻塞运行,直到全部任务完成

7、任务结束后,事件循环停止,打印任务的 result 方法返回值,即协程函数的 return 值

到这一步,大家应该可以看得出,上面的代码已经是异步编程的结构了,在事件循环内部,两个协程是交替运行完成的。简单叙述一下程序协程部分的运行过程:

-> 首先运行 task1

-> 打印 [corowork] Start coroutine ONE

-> 遇到 asyncio.sleep 阻塞

-> 释放 CPU 转到 task2 中执行

-> 打印 [corowork] Start coroutine TWO

-> 再次遇到 asyncio.sleep 阻塞

-> 这次没有其它协程可以运行了,只能等阻塞结束

-> task2 的阻塞时间较短,阻塞 1 秒后先结束,打印 [corowork] Stop coroutine TWO

-> 又过了 2 秒,阻塞 3 秒的 task1 也结束了阻塞,打印 [corowork] Stop coroutine ONE

-> 至此两个任务全部完成,事件循环停止

-> 打印两个任务的 result

-> 打印程序运行时间

-> 程序全部结束

需要额外说明的几点:

1、多数情况下无需调用 task 的 add_done_callback 方法,可以直接把回调函数中的代码写入 await 语句后面,协程是可以暂停和恢复的

2、多数情况下同样无需调用 task 的 result 方法获取协程函数的 return 值,因为事件循环的 run_until_complete 方法的返回值就是协程函数的 return 值。修改上文 # 6 、7 的代码如下:

    ...:     result = loop.run_until_complete(gather)
    ...:     print(result)

再次运行结果为:

In [73]: four()
[corowork] Start coroutine ONE
[corowork] Start coroutine TWO
[corowork] Stop coroutine TWO
[corowork] Stop coroutine ONE
['Coroutine ONE OK', 'Coroutine TWO OK']  # 变量 result 的值
运行耗时:3.0045

3、事件循环有一个 stop 方法用来停止循环和一个 close 方法用来关闭循环。以上示例中都没有调用 loop.close 方法,似乎并没有什么问题。所以到底要不要调用 loop.close 呢?简单来说,loop 只要不关闭,就还可以再次运行 run_until_complete 方法,关闭后则不可运行。有人会建议调用 loop.close,彻底清理 loop 对象防止误用,其实多数情况下根本没有这个必要。

4、asyncio 模块提供了 asyncio.gather 和 asyncio.wait 两个任务收集方法,它们的作用相同,都是将协程任务按顺序排定,再将返回值作为参数加入到事件循环中。前者在上文已经用到,后者与前者的区别是它可以获取任务的执行状态(PENING & FINISHED),当有一些特别的需求例如在某些情况下取消任务,可以使用 asyncio.wait 方法。

小结

  • 协程装饰器
  • 任务和事件循环
  • 任务的状态
  • async / await 关键字
  • 绑定回调
  • 多任务

异步编程

取消任务

在事件循环启动之后停止之前,我们可以手动取消任务的执行,注意 PENDING 状态的任务才能被取消,FINISHED 状态的任务已经完成,不能取消。

事件循环的 cancel 方法

import asyncio

async def work(id, t):
    print('Wroking...')
    await asyncio.sleep(t)
    print('Work {} done'.format(id))

def main():
    loop = asyncio.get_event_loop()
    coroutines = [work(i, i) for i in range(1, 4)]            # 1
    try:
        loop.run_until_complete(asyncio.gather(*coroutines))  # 2
    except KeyboardInterrupt:
        loop.stop()    # 3
    finally:
        loop.close()   # 4

if __name__ == '__main__':
    main()

代码说明:

1、创建一个列表,列表中有 3 个协程对象,协程内部分别阻塞 1 – 3 秒

2、程序运行过程中,快捷键 Ctrl + C 会触发 KeyboardInterrupt 异常。捕获这个异常,在程序终止前完成 # 3 和 # 4 代码的执行

3、事件循环的 stop 方法取消所有任务,停止事件循环

4、关闭事件循环

代码运行结果:

$ python3 async_cancel.py
Wroking...
Wroking...
Wroking...
Work 1 done
^C%

程序运行过程:

-> 首先,id 为 1 的协程先启动运行

-> 打印 Working…

-> 遇到 IO 阻塞,释放 CPU ,CPU 去到 id 为 2 的协程中运行

-> 同样首先打印 Working…

-> 遇到 IO 阻塞,同样释放 CPU ,第三个协程开始运行,打印 Working…

-> 以上步骤瞬间完成,这时候的 loop 中全部协程处于阻塞状态

-> 一秒钟后,id 为 1 的协程结束阻塞

-> 打印 Work 1 done

-> 然后手动按下快捷键 Ctrl + C ,触发 KeyboardInterrupt 异常

-> try except 语句捕获异常,执行 # 3 和 # 4

-> 程序运行完毕

task 的 cancel 方法

任务的 cancel 方法也可以取消任务,而 asyncio.Task.all_tasks 方法可以获得事件循环中的全部任务。修改上文代码中的 main 函数如下:

def main():
    loop = asyncio.get_event_loop()
    coroutines = [work(i, i) for i in range(1, 4)]
    # 程序运行过程中,快捷键 Ctrl + C 会触发 KeyboardInterrupt 异常
    try:
        loop.run_until_complete(asyncio.gather(*coroutines))
    except KeyboardInterrupt:
        print()
        # 每个线程里只能有一个事件循环
        # 此方法可以获得事件循环中的所有任务的集合
        # 任务的状态有 PENDING 和 FINISHED 两种
        tasks = asyncio.Task.all_tasks()
        for i in tasks:
            print('取消任务:{}'.format(i))
            # 任务的 cancel 方法可以取消未完成的任务
            # 取消成功返回 True ,已完成的任务取消失败返回 False
            print('取消状态:{}'.format(i.cancel()))
    finally:
        loop.close()

再次执行结果如下:

$ python3 async_cancel.py
Wroking...
Wroking...
Wroking...
Work 1 done
^C
取消任务:<Task finished coro=<work() done, defined at a.py:5> result=None>
取消状态:False
取消任务:<Task pending coro=<work() running at a.py:7> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x102cd8a38>()]> cb=[gather.<locals>._done_callback() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:664]>
取消状态:True
取消任务:<Task pending coro=<work() running at a.py:7> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x102cd8a98>()]> cb=[gather.<locals>._done_callback() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:664]>
取消状态:True

排定任务

  • 排定 task / future 在事件循环中的执行顺序,也就是对应的协程先执行哪个,遇到 IO 阻塞时,CPU 转而运行哪个任务,这是我们在进行异步编程时的一个需求。
  • 多任务程序中,事件循环里的任务的执行顺序由 asyncio.ensure_future / loop.create_task 和 asyncio.gather 排定。

loop.run_forever

  • 事件循环的 run_until_complete 方法运行事件循环,当其中的全部任务完成后,自动停止事件循环;
  • run_ferever 方法为无限运行事件循环,需要自定义 loop.stop 方法并执行之才会停止。
import asyncio

async def work(loop, t):
    print('start')
    await asyncio.sleep(t)  # 模拟 IO 操作
    print('after {}s stop'.format(t))
    loop.stop()             # 停止事件循环,stop 后仍可重新运行

loop = asyncio.get_event_loop()             # 创建事件循环
task = asyncio.ensure_future(work(loop, 1)) # 创建任务,该任务会自动加入事件循环
loop.run_forever()  # 无限运行事件循环,直至 loop.stop 停止
loop.close()        # 关闭事件循环,只有 loop 处于停止状态才会执行

运行程序:

$ python3 run_forever.py
start
after 1s stop

以上是单任务事件循环,将 loop 作为参数传入协程函数创建协程,在协程内部执行 loop.stop 方法停止事件循环。下面是多任务事件循环,使用回调函数执行 loop.stop 停止事件循环,修改 run_forever.py 文件如下:

import time
import asyncio
import functools

def loop_stop(loop, future):    # 函数的最后一个参数须为 future / task
    loop.stop()                 # 停止事件循环,stop 后仍可重新运行

async def work(t):              # 协程函数
    print('start')
    await asyncio.sleep(t)      # 模拟 IO 操作
    print('after {}s stop'.format(t))

def main():
    loop = asyncio.get_event_loop()
    # 创建任务收集器,参数为任意数量的协程,任务收集器本身也是 task / future 对象
    tasks = asyncio.gather(work(1), work(2))
    # 任务收集器的 add_done_callback 方法添加回调函数
    # 当所有任务完成后,自动运行此回调函数
    # 注意 add_done_callback 方法的参数是回调函数
    # 这里使用 functools.partial 方法创建偏函数以便将 loop 作为参数加入
    tasks.add_done_callback(functools.partial(loop_stop, loop))
    loop.run_forever()  # 无限运行事件循环,直至 loop.stop 停止
    loop.close()        # 关闭事件循环

if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time()
    print('耗时:{:.4f}s'.format(end - start))

运行程序:

$ python3 run_forever.py
start
start
after 1s stop
after 2s stop
耗时:2.0064s

  • loop.run_until_complete 方法本身也是调用 loop.run_forever 方法,然后通过回调函数调用 loop.stop 方法实现的。

loop.call_soon

事件循环的 call_soon 方法可以将普通函数作为任务加入到事件循环并立即排定任务的执行顺序。

import asyncio
import time

def hello(name):          # 普通函数
    print('[hello] Hello, {}'.format(name))

async def work(t, name):  # 协程函数
    print('[work ] start', name)
    await asyncio.sleep(t)
    print('[work ] {} after {}s stop'.format(name, t))

def main():
    loop = asyncio.get_event_loop()
    # 向事件循环中添加任务
    asyncio.ensure_future(work(1, 'A'))     # 第 1 个执行
    # call_soon 将普通函数当作 task 加入到事件循环并排定执行顺序
    # 该方法的第一个参数为普通函数名字,普通函数的参数写在后面
    loop.call_soon(hello, 'Tom')            # 第 2 个执行
    # 向事件循环中添加任务
    loop.create_task(work(2, 'B'))          # 第 3 个执行
    # 阻塞启动事件循环,顺便再添加一个任务
    loop.run_until_complete(work(3, 'C'))   # 第 4 个执行

if __name__ == '__main__':
    main()

运行程序:

$ python3 call_soon.py
[work ] start A
[hello] Hello, Tom
[work ] start B
[work ] start C
[work ] A after 1s stop
[work ] B after 2s stop
[work ] C after 3s stop

loop.call_later

  • 此方法同 loop.call_soon 一样,可将普通函数作为任务放到事件循环里,不同之处在于此方法可延时执行,第一个参数为延时时间。
import asyncio
import functools

def hello(name):            # 普通函数
    print('[hello]  Hello, {}'.format(name))

async def work(t, name):    # 协程函数
    print('[work{}]  start'.format(name))
    await asyncio.sleep(t)
    print('[work{}]  stop'.format(name))

def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(work(1, 'A'))         # 任务 1
    loop.call_later(1.2, hello, 'Tom')          # 任务 2
    loop.call_soon(hello, 'Kitty')              # 任务 3
    task4 = loop.create_task(work(2, 'B'))      # 任务 4
    loop.call_later(1, hello, 'Jerry')          # 任务 5
    loop.run_until_complete(task4)

if __name__ == '__main__':
    main()

运行程序:

$ python3 call_later.py
[workA]  start
[hello]  Hello, Kitty
[workB]  start
[hello]  Hello, Jerry
[workA]  stop
[hello]  Hello, Tom
[workB]  stop

毫无疑问,这五个任务在一个事件循环里是顺序执行,遇到阻塞执行下一个,程序执行顺序如下:

-> 首先执行任务一,打印一行后阻塞 1 秒,执行任务二

-> 任务二是 call_later 1.2 秒,就相当于一个 1.2 秒的 asyncio.sleep

-> 注意,call_later 这个延时 1.2 秒是事件循环启动时就开始计时的

-> 任务二阻塞,执行任务三,这个简单,打印一行就完事儿

-> 接着执行任务四,打印一行后阻塞 2 秒

-> 接着执行任务五,还是 call_later 1 秒,阻塞

-> 以上是五个任务第一轮的执行情况

第二轮开始前,CPU 一直候着,现在还有 4 个任务,任务三已完成

-> 第一个发出执行信号的是任务五,它只阻塞 1 秒

-> 上面已经说了,这个 1 秒是从事件循环启动时开始算

-> 所以这个阻塞肯定比任务一的阻塞 1 秒先结束

-> CPU 执行完任务五,任务一也阻塞结束了,执行之

-> 然后是任务二,最后是任务四

-> 第二轮打印了 4 行,全部任务完成,停止事件循环

loop.call_at & loop.time

  • call_soon 立刻执行,call_later 延时执行,call_at 在某时刻执行
  • loop.time 就是事件循环内部的一个计时方法,返回值是时刻,数据类型是 float

在 call_later.py 文件的基础上修改 main 函数中的代码如下,写入 call_at.py 文件:

def main():
    loop = asyncio.get_event_loop()
    start = loop.time()                         # 事件循环内部时刻
    asyncio.ensure_future(work(1, 'A'))         # 任务 1
    # loop.call_later(1.2, hello, 'Tom')
    # 上面注释这行等同于下面这行
    loop.call_at(start+1.2, hello, 'Tom')       # 任务 2
    loop.call_soon(hello, 'Kitty')              # 任务 3
    task4 = loop.create_task(work(2, 'B'))      # 任务 4
    # loop.call_later(1, hello, 'Jerry')
    # 上面注释这行等同于下面这行
    loop.call_at(start+1, hello, 'Jerry')       # 任务 5

    loop.run_until_complete(task4)

运行文件结果与 call_later.py 一致,不再展示。

这三个 call_xxx 方法的作用都是将普通函数作为任务排定到事件循环中,返回值都是 asyncio.events.TimerHandle 实例,

  • 注意它们不是协程任务 ,不能作为 loop.run_until_complete 的参数。

协程锁

按照字面意思来看,asyncio.lock 应该叫做异步 IO 锁,之所以叫协程锁,是因为它通常使用在子协程中,其作用是将协程内部的一段代码锁住,直到这段代码运行完毕解锁。

  • 协程锁的固定用法是使用 async with 创建协程锁的上下文环境,将代码块写入其中。
import asyncio

l = []
lock = asyncio.Lock()   # 协程锁

async def work(name):
    print('lalalalalalalala')     # 打印此信息是为了测试协程锁的控制范围
    # 这里加个锁,第一次调用该协程,运行到这个语句块,上锁
    # 当语句块结束后解锁,开锁前该语句块不可被运行第二次
    # 如果上锁后有其它任务调用了这个协程函数,运行到这步会被阻塞,直至解锁
    # with 是普通上下文管理器关键字,async with 是异步上下文管理器关键字
    # 能够使用 with 关键字的对象须有 __enter__ 和 __exit__ 方法
    # 能够使用 async with 关键字的对象须有 __aenter__ 和 __aexit__ 方法
    # async with 会自动运行 lock 的 __aenter__ 方法,该方法会调用 acquire 方法上锁
    # 在语句块结束时自动运行 __aexit__ 方法,该方法会调用 release 方法解锁
    # 这和 with 一样,都是简化 try ... finally 语句
    async with lock:
        print('{} start'.format(name))  # 头一次运行该协程时打印
        if 'x' in l:                    # 如果判断成功
            return name                 # 直接返回结束协程,不再向下执行
        await asyncio.sleep(0); print('----------')  # 阻塞 0 秒,切换协程
        l.append('x')
        print('{} end'.format(name))
        return name

async def one():
    name = await work('one')
    print('{} ok'.format(name))

async def two():
    name = await work('two')
    print('{} ok'.format(name))

def main():
    loop = asyncio.get_event_loop()
    tasks = asyncio.wait([one(), two()])
    loop.run_until_complete(tasks)

if __name__ == '__main__':
    main()

运行程序如下:

$ python3 async_lock.py
lalalalalalalala
one start
lalalalalalalala
----------
one end
one ok
two start
two ok

小结

  • 取消任务的两种方法
  • loop.run_forever 无限循环
  • 将普通函数作为任务注入事件循环的三种方法
  • loop.time 事件循环内部时间
  • asyncio.lock
Nemo版权所有丨如未注明,均为原创丨本网站采用BY-NC-SA协议进行授权,转载请注明转自:https://kanghaov.com/304.html
https://secure.gravatar.com/avatar/9fd8359b8faa6f7789f9623ba6041e4a?s=256&d=identicon&r=g

kanghaov

文章作者

推荐文章

发表评论

textsms
account_circle
email

Nemo

笔记-Python中asyncio与异步编程
从线程到协程 多进程与多线程 所谓多进程,就是系统可以同时运行多个任务; 在操作系统中,每个任务就是一个进程。每个进程至少做一件事,多数进程会做很多事; 在一个进程中,就有很…
扫描二维码继续阅读
2019-08-20