网址

  1. 实验楼_python异步编程
  2. Python异步IO之协程
  3. Python黑魔法 — 异步IO
  4. asyncio 学习笔记
  5. async with 和 async for

1.GIL

线程资源共用,所以容易造成预想不到的效果,也就是说线程是不安全的.
如何解决线程安全问题?

CPython解释器使用了加锁的方法。每个进程有一把锁,启动线程先加锁,结束线程释放锁

打个比方,进程是一个厂房,厂房大门是开着的,门内有锁,工人进入大门后可以在内部上锁。厂房里面有10个车间对应10个线程,每个CPU就是一个工人。

进程 – 厂房

线程 – 车间

CPU – 工人

GIL(Global Interpreter Lock)全局锁就相当于厂房规定:工人要到车间工作,从厂房大门进去后要在里面反锁,完成工作后开锁出门,下一个工人再进门上锁。也就是说,任意时刻厂房里只能有一个工人,但这样就保证了工作的安全性,这就是GIL的原理。

当然了,GIL的存在有很多其它益处,包括简化 CPython 解释器和大量扩展的实现。

2.异步

所谓的异步,就是CPU在当前线程阻塞时可以去其它线程中工作,不管怎么设计,在一个线程内部代码都是顺序执行的,遇到IO都得阻塞,所谓的非阻塞,是遇到当前线程阻塞时,CPU去其它线程工作

  1. 异步:多任务,==多个任务之间执行没有先后顺序==,可以同时运行,执行的先后顺序不会有什么影响,存在的多条运行主线
  2. 同步:多任务,==多个任务之间执行的时候要求有先后顺序==,必须一个先执行完成之后,另一个才能继续执行,只有一个主线
  3. 阻塞:从调用者的角度出发,如果在调用的时候,被卡住,不能再继续向下运行,需要等待,就说是阻塞。
  4. 非阻塞:从调用者的角度出发,如果在调用的时候,没有被卡住,能够继续向下运行,无需等待,就说是非阻塞。

同异步与阻塞的区别:

  • 同步和异步关注的是消息通信机制;
  • 阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。

3.协程

协程是在线程的基础上编写,由程序员决定代码执行顺序、可以互相影响的高耦合度代码的一种高级程序设计模式。

生成器函数的执行结果是生成器,注意这里所讲的“执行结果”不是函数的 return值。生成器终止时必定抛出 Stoplteration异常,for循环可以捕获此异常,异常的 value 属性值为生成器函数的 return值。

生成器会在yield语句处暂停,这是至关重要的,未来协程中的IO阻塞就出现在这里.

协程有四种存在状态:

  1. GEN_CREATED
    创建完成,等待执行
  2. GEN _RUNNING
    解释器正在执行(这个状态在下面的示例程序中无法看到)
  3. GEN_SUSPENDED
    在yield表达式处暂停
  4. GEN_CLOSE
    执行结束,生成器停止

预先激活生成器(或协程)可以使用next方法,也可以使用生成器的send方法发送None值:g.send(None)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 预激生成器装饰器
import inspect
from functools import wraps


def deco(func):
@wraps(func)
def wrapper(*args,**kw):
g = func(*args,**kw)
next(g)
return g
return wrapper

@deco
def gen():
i = 'hyl'
while True:
try:
value = yield i
except ValueError:
print('over')
i = value


g = gen()
print(inspect.getgeneratorstate(g))

前文提到Stoplteration异常的value属性值为生成器(协程)函数的 return 值,我们可以在使用协程时捕获异常并得到这个值。举例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import inspect
from functools import wraps


def deco(func):
@wraps(func)
def wrapper(*args,**kw):
g = func(*args,**kw)
next(g)
return g
return wrapper

@deco
def gen():
i = []
while True:
value = yield
if value == 'close':
break
i.append(value)
return i


g = gen()
g.send('hello')
g.send('123')
g.send('close')
# Traceback (most recent call last):
# File "D:\trainingfile\training2.py", line 27, in <module>
# g.send('close')
# StopIteration: ['hello', '123']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import inspect
from functools import wraps


def deco(func):
@wraps(func)
def wrapper(*args,**kw):
g = func(*args,**kw)
next(g)
return g
return wrapper

@deco
def gen():
i = []
while True:
value = yield
if value == 'close':
break
i.append(value)
return i


g = gen()
g.send('hello')
g.send('123')

try:
g.send('close')
except StopIteration as e:
print(e.value)
# ['hello', '123']

4.生成器注意事项

  1. 当一个生成器对象被销毁时,或者主程序结束的时候会抛出一个GeneratorExit异常。
  2. GeneratorExit异常的产生意味着生成器对象的生命周期已经结束。因此,一旦产生了GeneratorExit异常,生成器方法后续执行的语句中,不能再有yield语句,否则会产生RuntimeError。
  3. throw:用来向生成器函数送入一个异常,如果生成器处理了这个异常,代码会向前执行到下一个yield,产生的值成为调用throw方法的返回值。
  4. close方法会在生成器对象方法的挂起处抛出一个GeneratorExit异常 。

5.yield from

yield from的两大优势

  1. 避免嵌套循环
  2. 转移控制权

yield from语句可以替代for循环,避免了嵌套循环.同yield一样,有 yield from语句的函数叫做协程函数或生成器函数.

yield from后面接收一个可迭代对象,在协程中,==可迭代对象往往是协程对象,这样就形成了嵌套协程==.

1
2
3
4
5
6
def func():
yield from range(1,10)
yield from [1,2,3]

for each in func():
print(each)

yield from允许子生成器直接从调用者接收其发送的信息或者抛出调用时遇到的异常,并且返回给委派生产器一个值。

所谓转移控制权就是 yield from语法可以==将子生成器的控制权交给调用方main函数==,在main函数内部创建父生成器C,控制 c.send方法传值给子生成器.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from functools import wraps
import time
from faker import Faker


# 预激协程装饰器
def coroutine(func):
@wraps(func)
def wrapper(*args, **kw):
g = func(*args, **kw)
next(g)
return g
return wrapper

# 子生成器函数,这个生成器是真正做事的生成器
def sub_coro():
l = []
while True:
value = yield
if value == 'CLOSE':
break
l.append(value)
return sorted(l)


# 使用预激协程装饰器
@coroutine
# 带有 yield from 语句的父生成器函数
def dele_coro():
# while True 可以多次循环,每次循环会创建一个新的子生成器 sub_coro()
# 这里 while 只循环一次,这是由调用方,也就是 main 函数决定的
# while 循环可以捕获函数本身创建的父生成器终止时触发的 StopIteration 异常
while True:
# yield from 会自动预激子生成器 sub_coro()
# 所以 sub_coro 在定义时不可以使用预激协程装饰器
# yield from 将捕获子生成器终止时触发的 StopIteration 异常
# 并将异常的 value 属性值赋值给等号前面的变量 l
# 也就是 l 变量的值等于 sub_coro 函数的 return 值
# yield from 还实现了一个重要功能
# 就是父生成器的 send 方法将发送值给子生成器
# 并赋值给子生成器中 yield 语句等号前面的变量 value
l = yield from sub_coro()
print('排序后的列表:', l)
print('------------------')

# 调用父生成器的函数,也叫调用方
def main():
# 生成随机国家代号的方法
fake = Faker().country_code
# 嵌套列表,每个子列表中有三个随机国家代号(字符串)
nest_country_list = [[fake() for i in range(3)] for j in range(3)]

for country_list in nest_country_list:
print('国家代号列表:', country_list)
c = dele_coro() # 创建父生成器
for country in country_list:
c.send(country) # 父生成器的 send 方法将国家代号发送给子生成器
# CLOSE 将终止子生成器中的 while 循环
# 子生成器的 return 值赋值给父生成器 yield from 语句中等号前面的变量 l
c.send('CLOSE')

if __name__ == '__main__':
main()

# 国家代号列表: ['SA', 'FJ', 'PK']
# 排序后的列表: ['FJ', 'PK', 'SA']
# ------------------
# 国家代号列表: ['VA', 'KP', 'MW']
# 排序后的列表: ['KP', 'MW', 'VA']
# ------------------
# 国家代号列表: ['ZW', 'UA', 'BD']
# 排序后的列表: ['BD', 'UA', 'ZW']
# ------------------

注意:

  1. yield from 会自动预激生成器
  2. 父生成器的l = yield from sub_coro()中的l就会接受子生成器的return值.
    也就是说,==l只有在子生成器结束的时候才会被赋值==.
  3. 父生成器就是一个通道而已,在上面代码中,通道没有做任何额外的事情

使用通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def coroutine(func):
def wrapper(*args, **kw):
g = func(*args, **kw)
next(g)
return g
return wrapper

def add():
res = 0
while True:
i = yield
if i == 'close':
break
else:
res += i
return res

@coroutine
def parent():
while True:
a = yield from add()
return a # 这行只执行了一次,就是在最后的时候


def main():
gen = parent()
gen.send(1)
gen.send(2)
gen.send(3)
gen.send(4)
x = gen.send('close')
print(x)

try:
main()
except StopIteration as e:
print(e) # 10

不使用通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def coroutine(func):
def wrapper(*args, **kw):
g = func(*args, **kw)
next(g)
return g
return wrapper

@coroutine
def add():
res = 0
while True:
i = yield
if i == 'close':
break
else:
res += i
return res


def main():
gen = add()
gen.send(1)
gen.send(2)
gen.send(3)
gen.send(4)
x = gen.send('close')
print(x)

try:
main()
except StopIteration as e:
print(e) # 10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def coroutine(func):
def wrapper(*args, **kw):
g = func(*args, **kw)
next(g)
return g
return wrapper


def sort_list():
res = []
while True:
i = yield
if i != 'close':
res.append(i)
else:
break
return sorted(res)

@coroutine
def parent():
while True:
a = yield from sort_list()
return a

def main():
g = parent()
g.send(1)
g.send(5)
g.send(59)
g.send(7)
g.send('close')

try:
main()
except StopIteration as e:
print(e) # [1, 5, 7, 59]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def avg():
tot = 0
count = 0
while True:
a = yield
if a != None:
tot += a
count += 1
else:
return tot/count


def proxies(aDict,key):
while True:
aDict[key] = yield from avg()


def main():
data = {'hyl':[95,96,97],'dsz':[88,90]}
res = {}

for person,score in data.items():
gen = proxies(res,person)
next(gen)

for each in score:
gen.send(each)
gen.send(None)

print(res)

if __name__ == '__main__':
main() # {'hyl': 96.0, 'dsz': 89.0}

而yield from功能还不止于此,它还有一个主要的功能是省去了很多异常的处理,不再需要我们手动编写,其内部已经实现大部分异常处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def generator_1():
total = 0
while True:
x = yield
print('加',x)
if not x:
break
total += x
return total

def generator_2(): # 委托生成器
while True:
total = yield from generator_1() # 子生成器
print('加和总数是:',total)

def main(): # 调用方
g1 = generator_1()
g1.send(None)
g1.send(2)
g1.send(3)
g1.send(None)

main()
# 加 2
# 加 3
# 加 None
# Traceback (most recent call last):
# File "D:\trainingfile\training4.py", line 23, in <module>
# main()
# File "D:\trainingfile\training4.py", line 21, in main
# g1.send(None)
# StopIteration: 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def generator_1():
total = 0
while True:
x = yield
print('加',x)
if not x:
break
total += x
return total

def generator_2(): # 委托生成器
while True:
total = yield from generator_1() # 子生成器
print('加和总数是:',total)

def main(): # 调用方
g2 = generator_2()
g2.send(None)
g2.send(2)
g2.send(3)
g2.send(None)

main()
# 加 2
# 加 3
# 加 None
# 加和总数是: 5

可见yield from封装了处理常见异常的代码。对于g2即便传入None也不报异常,其中total = yield from generator_1()返回给total的值是generator_1()最终的return total

概念:

  1. 子生成器:yield from后的generator_1()生成器函数是子生成器
  2. 委托生成器:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
  3. 调用方:main()是程序中的调用方,负责调用委托生成器。

yield from在其中的作用是:建立调用方和子生成器的通道

  1. 在上述代码中main()每一次在调用send(value)时,value不是传递给了委托生成器generator_2(),而是借助yield from传递给了子生成器generator_1()中的yield
  2. 同理,子生成器中的数据也是通过yield直接发送到调用方main()中。

6.线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time
from concurrent import futures

def printer(num):
time.sleep(1)
print(num)


def main():
with futures.ThreadPoolExecutor(5) as executor:
executor.map(printer,range(4))

if __name__ == '__main__':
main()

注意:executor.map(printer,range(4))是将range(4)迭代出来的元素作为printer的参数.并不是整个range(4)作为priner的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
from concurrent import futures

def printer(num):
time.sleep(1)
print(num)

def main():
with futures.ThreadPoolExecutor(5) as executor:
executor.map(printer,((1,2),(3,4),(5,6),(7,8),(9,10)))

if __name__ == '__main__':
main()
# (3, 4)
# (1, 2)
# (7, 8)
# (5, 6)
# (9, 10)

executor.__exit__方法会调用executor.shutdown(wait=True)方法,等待所有线程都执行完毕后关闭线程池

future 是一种对象,表示异步执行的操作。Future封装待完成的操作

7.支持获取返回值的线程

使用threading自定义一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
import random
import threading

class Test(threading.Thread):
def foo(self):
time.sleep(2)
return random.randint(0,5)

def run(self):
self._return = self.foo()

def get_result(self):
return self._return


test1 = Test()
test2 = Test()
test1.start()
test2.start()

test1.join()
test2.join()
print(test1.get_result())
print(test2.get_result())

使用threaing指定一个更抽象的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from threading import Thread
import time

def printer(num):
while True:
time.sleep(1)
print(num)

class ThreadWithReturnValue(Thread):
def run(self):
if self._target:
self._return = self._target(*self._args, **self._kwargs)

def get_result(self):
return self._return

test = ThreadWithReturnValue(target=printer,args=(2,))
test.start()

使用future.ThreadPoolExecutor的map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
from concurrent import futures

def func(num):
print('---hyl')
time.sleep(2)
return ('this is return',num)


with futures.ThreadPoolExecutor(3) as executor:
result = executor.map(func,range(6))
print(result)
# result 存储了所有future的返回值,是一个生成器
# <generator object Executor.map.<locals>.result_iterator at 0x00000170C562C9A8>

# 迭代这个生成器,获得返回值
for each in result:
print(each)

# ---hyl
# ---hyl
# ---hyl
# <generator object Executor.map.<locals>.result_iterator at 0x000001E855EAD9A8>
# ---hyl
# ---hyl
# ---hyl
# ('this is return', 0)
# ('this is return', 1)
# ('this is return', 2)
# ('this is return', 3)
# ('this is return', 4)
# ('this is return', 5)

发现(‘this is return’, 0)都是最后才打印的,说明:
使用with语句就能起到join()的阻塞作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
from concurrent import futures

def func(num):
print('---hyl')
time.sleep(2)
return ('this is return',num)


executor = futures.ThreadPoolExecutor(3)
result = executor.map(func,range(6))
print('*------------')

# 迭代这个生成器,获得返回值
for each in result:
print(each)

time.sleep(20)

print('**************************')

# ---hyl
# ---hyl
# ---hyl
# *------------(等待2秒后执行下面代码)
# ---hyl
# ('this is return', 0)
# ---hyl
# ('this is return', 1)
# ---hyl
# ('this is return', 2)
# ('this is return', 3)
# ('this is return', 4)
# ('this is return', 5)(等待20秒后执行下面代码)
# **************************

发现print('**************************')是最后执行的,这说明for循环阻塞了

说明必须等到result这个生成器所有之填充完毕后才会退出for循环

总结:

  1. with能够阻塞,必须等待result = executor.map(func,range(6))才能退出with语句
  2. result也能阻塞,必须等待result这个生成器有值才会向下执行代码

Executor.map还有个特性比较有用,那就是这个函数返回==结果的顺序于调用开始的顺序是一致的==。如果第一个调用生成结果用时10秒,其他调用只用1秒,代码会阻塞10秒,获取map方法返回的生成器产出的第一个结果。

守护进程

  • 一般创建了子进程,主线程都会等子进程完全执行完,主进程才会结束
  • 守护线程就是主线程不等子进程,主线程做完程序立马结束。

设置守护进程

1
2
3
#注意一定要把daemon设置在 start 之前设置
p=Process(target=func)
p.daemon=True
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Process
import time


def func():
print('*****')
time.sleep(10)
print('我还活着')


def main():
p = Process(target=func)
p.daemon = True
p.start()

if __name__ == '__main__':
print('-----------')
main()
print('end')

# -----------
# end

还可以使用join():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process
import time


def func():
print('*****')
time.sleep(10)
print('我还活着')


if __name__ == '__main__':
print('-----------')

p = Process(target=func)
p.daemon = True
p.start()
p.join()

print('end')

# -----------
# *****
# 我还活着
# end

守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

守护子进程和非守护子进程可以并存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process
from threading import Thread
import time,os

def foo():
print(123)
time.sleep(1)
print("end123")

def bar():

print(456)
time.sleep(3)
print("end456")


if __name__ == '__main__':
p1 = Process(target=foo)
p2 = Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
print("main-------")

# main-------
# 456
# end456

守护线程

  1. 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,

  2. 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Process
from threading import Thread
import os,time,random

def task():
for each in range(1000):
print(each)
print('%s is done' %os.getpid())

if __name__ == '__main__':
t = Thread(target=task)
t.daemon = True
t.start()
print('主')

# 0
# 主
# 1

守护子线程非守护子进程并存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from threading import Thread
import time

def foo():
print(123)
time.sleep(100)
print("end123")

def bar():
print(456)
time.sleep(3)
print("end456")

if __name__ == '__main__':
t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon=True

t1.start()
t2.start()
print("main-------")

# 123
# 456
# main-------
# end456

注意:

  • 主线程在其他非守护线程运行完毕后才算运行完毕
  • 可能发生其他非守护线程过晚结束,守护线程执行完毕的现象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from threading import Thread
import time

def foo():
print(123)
time.sleep(1)
print("end123")

def bar():
print(456)
time.sleep(10)
print("end456")

if __name__ == '__main__':
t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon=True

t1.start()
t2.start()
print("main-------")

# 主线程的结束是指:所有非守护线程的结束
# 这里的非守护线程过晚结束了

# 123
# 456
# main-------
# end123
# end456

8.asyncio模块

基础概念

@asyncio.coroutine:asyncio模块中的装饰器,用于将一个生成器声明为协程。

yield from 其实就是等待另外一个协程的返回。

简单来说:
@asyncio.coroutine装饰器是协程函数的标志,我们需要在每一个任务函数前加这个装饰器,并在函数中使用yield from。

==在协程中yield from后面必须是子生成器函数==

在Python 3.5开始引入了新的语法async和await,以简化并更好地标识异步IO。

  • 使用async代替@asyncio.coroutine
  • 使用await代替yield from

我们可以使用async修饰将普通函数和生成器函数包装成异步函数和异步生成器。

  • 异步函数(协程)

    1
    2
    async def async_function():
    return 1
  • 异步生成器

    1
    2
    async def async_generator():
    yield 1

所以,函数可以分成四种:

1
2
3
4
5
import types
print(type(function) is types.FunctionType)
print(type(generator()) is types.GeneratorType)
print(type(async_function()) is types.CoroutineType)
print(type(async_generator()) is types.AsyncGeneratorType)

在协程函数中,可以通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果:
await后面的对象需要是一个Awaitableawait的目的是等待协程控制流的返回,而实现暂停并挂起函数的操作是yield

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def async_function():
return 1

async def await_coroutine():
result = await async_function()
print(result)

def run(coroutine):
try:
coroutine.send(None)
except StopIteration as e:
return e.value

run(await_coroutine())
# 1
  • event_loop 事件循环:
    程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
  • coroutine 协程:
    协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • task 任务:
    一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  • future:
    代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
    (task对象是Future类的子类)
  • async/await 关键字:
    python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口

走个流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 使用同步方式编写异步功能
import time
import asyncio


async def taskIO_1():
print('开始运行IO任务1...')
await asyncio.sleep(2) # 假设该任务耗时2s
print('IO任务1已完成,耗时2s')
return taskIO_1.__name__


async def taskIO_2():
print('开始运行IO任务2...')
await asyncio.sleep(3) # 假设该任务耗时3s
print('IO任务2已完成,耗时3s')
return taskIO_2.__name__


async def main(): # 调用方
tasks = [taskIO_1(), taskIO_2()] # 把所有任务添加到task中
done,pending = await asyncio.wait(tasks) # 子生成器
for r in done: # done和pending都是一个任务,所以返回结果需要逐个调用result()
print('协程无序返回值:'+r.result())


if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
# 开始运行IO任务1...
# 开始运行IO任务2...
# IO任务1已完成,耗时2s
# IO任务2已完成,耗时3s
# 协程无序返回值:taskIO_1
# 协程无序返回值:taskIO_2
# 所有IO任务总耗时3.00328秒

使用asyncio.coroutine装饰器

Python3.3的yield from语法可以把生成器的操作委托给另一个生成器,生成器的调用方可以直接与子生成器进行通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def sub_gen():
yield 1
yield 2
yield 3

def gen():
return (yield from sub_gen())

def main():
for val in gen():
print(val)
# 1
# 2
# 3

利用这一特性,使用yield from能够编写出类似协程效果的函数调用,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from语法来创建协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y

@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

然而,用yield from容易在表示协程和生成器中混淆,没有良好的语义性,所以在Python 3.5推出了更新的async/await表达式来作为协程的语法。

因此类似以下的调用是等价的:

1
2
3
4
5
async with lock:
...

with (yield from lock):
...
1
2
3
4
5
def main():
return (yield from coro())

def main():
return (await coro())

初步使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

future = asyncio.Future()

async def coro1():
print('--- coro1 ---')
await asyncio.sleep(.1)
future.set_result('data')

async def coro2():
print('--- coro2 ---')
print(await future)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2(),
]))
loop.close()

# data

两个协程在在事件循环中,协程coro1在执行第一句后挂起自身切到asyncio.sleep,而协程coro2一直等待future的结果,让出事件循环,计时器结束后coro1执行了第二句设置了future的值,被挂起的coro2恢复执行,打印出future的结果’data’。

future可以被await证明了future对象是一个Awaitable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 使用同步方式编写异步功能
import time
import asyncio


@asyncio.coroutine # 标志协程的装饰器
def taskIO_1():
print('开始运行IO任务1...')
yield from asyncio.sleep(2) # 假设该任务耗时2s
print('IO任务1已完成,耗时2s')
return taskIO_1.__name__


@asyncio.coroutine # 标志协程的装饰器
def taskIO_2():
print('开始运行IO任务2...')
yield from asyncio.sleep(3) # 假设该任务耗时3s
print('IO任务2已完成,耗时3s')
return taskIO_2.__name__


@asyncio.coroutine # 标志协程的装饰器
def main(): # 调用方
tasks = [taskIO_1(), taskIO_2()] # 把所有任务添加到task中
done,pending = yield from asyncio.wait(tasks) # 子生成器
for r in done: # done和pending都是一个任务,所以返回结果需要逐个调用result()
print('协程无序返回值:'+r.result())


if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
try:
loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
finally:
loop.close() # 结束事件循环
print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
# 开始运行IO任务1...
# 开始运行IO任务2...
# IO任务1已完成,耗时2s
# IO任务2已完成,耗时3s
# 协程无序返回值:taskIO_1
# 协程无序返回值:taskIO_2
# 所有IO任务总耗时3.00328秒
  1. 上面代码先通过get_event_loop()获取了一个标准事件循环loop(因为是一个,所以协程是单线程)

  2. 然后,我们通过run_until_complete(main())来运行协程(此处把调用方协程main()作为参数,调用方负责调用其他委托生成器),run_until_complete的特点就像该函数的名字,直到循环事件的所有事件都处理完才能完整结束。

  3. 进入调用方协程,我们把多个任务[taskIO_1()和taskIO_2()]放到一个task列表中,可理解为打包任务。

  4. 使用==asyncio.wait(tasks)来获取一个awaitable objects==,即可等待对象的集合(此处的aws是协程的列表),并发运行传入的aws,同时通过yield from返回一个包含(done, pending)的元组,done表示已完成的任务列表,pending表示未完成的任务列表;

    • 如果使asyncio.as_completed(tasks)则会按完成顺序生成协程的迭代器(常用于for循环中),因此当你用它迭代时,会尽快得到每个可用的结果。
    • 此外,当轮询到某个事件时(如taskIO_1()),直到遇到该任务中的yield from中断,开始处理下一个事件(如taskIO_2())),当yield from后面的子生成器完成任务时,该事件才再次被唤醒
  5. 因为done里面有我们需要的返回结果,但它目前还是个任务列表,所以要取出返回的结果值,我们遍历它并逐个调用result()取出结果即可。

    (注:对于asyncio.wait()和asyncio.as_completed()返回的结果均是先完成的任务结果排在前面,所以此时打印出的结果不一定和原始顺序相同,但使用gather()的话可以得到原始顺序的结果集)

  6. 最后我们通过loop.close()关闭事件循环。

使用async和await实现协程

asyncio的使用可分三步走:

  1. 创建事件循环
  2. 指定循环模式并运行
  3. 关闭循环

简单来说就是:

  1. 将协程对象转为task任务对象
  2. 定义一个事件循环对象容器用来存放task
  3. 将task任务扔进事件循环对象中并触发

1.定义一个协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
import asyncio

async def do_some_work(x):
print('Waiting: ', x)

start = time.time()

coroutine = do_some_work(2)

# coroutine是协程,不是函数,所以do_some_work(2)返回的是协程对象
print(coroutine)
# <coroutine object do_some_work at 0x0000020BDB2EAAC8>

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

print('TIME: ', time.time() - start)
# Waiting: 2
# TIME: 0.0010001659393310547
  • 通过async关键字定义一个协程(coroutine),协程也是一种对象。协程不能直接运行,需要把协程加入到事件循环(loop),事件循环在适当的时候调用协程。
  • asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

2.创建一个task

在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象

==task对象是Future类的子类==。保存了协程运行后的状态,用于未来获取协程的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time

async def do_some_work(x):
print('Waiting: ', x)

start = time.time()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()

# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
print(task)
# <Task pending coro=<do_some_work() running at D:\trainingfile\training6.py:5>>
# 创建task后,task在加入事件循环之前是pending状态

loop.run_until_complete(task)
print(task)
# <Task finished coro=<do_some_work() done, defined at D:\trainingfile\training6.py:5> result=None>
# 打印finished状态。

print('TIME: ', time.time() - start)
# TIME: 0.0019979476928710938
  • asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task
  • run_until_complete的参数是一个futrue对象。
    当传入一个协程,其内部会自动封装成task,==task是Future的子类==。isinstance(task, asyncio.Future)将会输出True。

3.绑定回调

绑定回调,在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import asyncio

async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)

def callback(future):
print('Callback: ', future.result())

start = time.time()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)
print(task)

task.add_done_callback(callback)
loop.run_until_complete(task)

print('TIME: ', time.time() - start)
print(task)
# <Task pending coro=<do_some_work() running at D:\trainingfile\training5.py:4>>
# Waiting: 2
# Callback: Done after 2s
# TIME: 0.0010004043579101562
# <Task finished coro=<do_some_work() done, defined at D:\trainingfile\training5.py:4> result='Done after 2s'>

如果回调需要多个参数,可以通过偏函数导入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import asyncio
import functools

async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)

def callback(t, future):
print('Callback:', t, future.result())


start = time.time()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)
print(task)

task.add_done_callback(functools.partial(callback, 'hyl'))
loop.run_until_complete(task)

print('TIME: ', time.time() - start)
print(task)
# <Task pending coro=<do_some_work() running at D:\trainingfile\training1.py:5>>
# Waiting: 2
# Callback: hyl Done after 2s
# TIME: 0.00099945068359375
# <Task finished coro=<do_some_work() done, defined at D:\trainingfile\training1.py:5> result='Done after 2s'>

coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。
我们创建的task和回调里的future对象,实际上是同一个对象。

4.future 与 result

  • 前面不绑定回调的例子中,我们可以看到task有fiinished状态。
  • 当task处于fiinished状态,我们可以直接使用task的result方法获取返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
import asyncio

async def do_some_work(x):
print('Waiting {}'.format(x))
return 'Done after {}s'.format(x)

start = time.time()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print('Task ret: {}'.format(task.result()))
print('TIME: {}'.format(time.time() - start))
# Waiting 2
# Task ret: Done after 2s
# TIME: 0.0009996891021728516

5.阻塞和await

  • 使用async可以定义协程对象
  • 使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。
  • 协程遇到await,==事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行==。

我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time


async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)

start = time.time()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print('Task ret: ', task.result())
print('TIME: ', time.time() - start)
# Waiting: 2
# Task ret: Done after 2s
# TIME: 2.001849412918091
  • 在 asyncio.sleep的时候,使用await让出控制权。
  • 即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。现在我们的例子就用耗时的阻塞操作了。

6.并发和并行

asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。创建多个协程的列表,然后将这些协程注册到事件循环中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio
import time


async def do_some_work(x):
print('Waiting: ', x)

await asyncio.sleep(x)
return 'Done after {}s'.format(x)

start = time.time()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

# 任务列表
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
# wait 会分别把各个协程包装进一个 Task 对象
loop.run_until_complete(asyncio.wait(tasks))


print('--------')

for task in tasks:
print('Task ret: ', task.result())

print('TIME: ', time.time() - start)
# Waiting: 1
# Waiting: 2
# Waiting: 4
# --------
# Task ret: Done after 1s
# Task ret: Done after 2s
# Task ret: Done after 4s
# TIME: 4.002715826034546
  • 总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s。此时我们使用了aysncio实现了并发。
  • asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一个task列表,后者接收一堆task。

wait()官方文档用法如下:

done, pending = await asyncio.wait(aws)

7.协程嵌套

嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import time

async def do_some_work(x):
print('Waiting: ', x)

await asyncio.sleep(x)
return 'Done after {}s'.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

dones, pendings = await asyncio.wait(tasks)

for task in dones:
print('Task ret: ', task.result())


if __name__ == '__main__':
start = time.time()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print('TIME: ', time.time() - start)
# Waiting: 1
# Waiting: 2
# Waiting: 4
# Task ret: Done after 1s
# Task ret: Done after 2s
# Task ret: Done after 4s
# TIME: 4.001962423324585

8.协程停止

future对象有几个状态:

  • Pending

    创建future的时候,task为pending

  • Running

    事件循环调用执行的时候就是running

  • Done

    调用完毕是done

  • Cancelled

    如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
import time

async def do_some_work(x):
print('Waiting: ', x)

await asyncio.sleep(x)
return 'Done after {}s'.format(x)

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

start = time.time()

loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()

print('TIME: ', time.time() - start)

启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。可以看到输出如下:

1
2
3
4
5
6
7
8
9
Waiting:  1
Waiting: 2
Waiting: 2
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
True
True
True
True
TIME: 0.8858370780944824

True表示cannel成功

9.不同线程的事件循环

  • 很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。
  • 当前线程创建一个事件循环,然后再新建一个线程,在新线程中启动事件循环。当前线程不会被block。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time
from threading import Thread

def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))

start = time.time()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

# TIME: 0.0019989013671875
# More work 6
# Finished more work 6
# More work 3
# Finished more work 3

async with 和async for 用法说明

async with

异步上下文管理器指的是在enter和exit方法处能够暂停执行的上下文管理器

为了实现这样的功能,需要加入两个新的方法:__aenter__ 和__aexit__。这两个方法都要返回一个 awaitable类型的值。

1
2
3
4
5
6
class AsyncContextManager:
async def __aenter__(self):
await log('entering context')

async def __aexit__(self, exc_type, exc, tb):
await log('exiting context')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async with EXPR as VAR:
BLOCK

## 上面代码等同于下面:
mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__(mgr)
exc = True

VAR = await aenter
try:
BLOCK
except:
if not await aexit(mgr, *sys.exc_info()):
raise
else:
await aexit(mgr, None, None, None)

async for

  • 异步迭代器就是能够在next方法中调用异步代码
  • 一个异步可迭代对象(asynchronous iterable)能够在迭代过程中调用异步代码

为了支持异步迭代:

  1. 一个对象必须实现__aiter__方法,该方法返回一个异步迭代器(asynchronous iterator)对象。
  2. 一个异步迭代器对象必须实现__anext__方法,该方法返回一个awaitable类型的值。
  3. 为了停止迭代,__anext__必须抛出一个StopAsyncIteration异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
class AsyncIterable:
def __aiter__(self):
return self

async def __anext__(self):
data = await self.fetch_data()
if data:
return data
else:
raise StopAsyncIteration

async def fetch_data(self):
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async for TARGET in ITER:
BLOCK
else:
BLOCK2


## 上面代码等同于下面:
iter = (ITER)
iter = type(iter).__aiter__(iter)
running = True
while running:
try:
TARGET = await type(iter).__anext__(iter)
except StopAsyncIteration:
running = False
else:
BLOCK
else:
BLOCK2

补充

1.基础知识补充

  • loop = asyncio.get_event_loop():
    每个线程中只能有一个事件循环,get_event_loop 方法会获取当前已经存在的事件循环,如果当前线程中没有,就会新建一个

  • loop.run_until_complete(coroutine):

    将协程对象注入到事件循环,协程的运行由事件循环控制。

    • 事件循环的 run_until_complete 方法会阻塞运行,直到任务全部完成。
    • 协程对象作为 run_until_complete 方法的参数,loop 会自动将协程对象包装成任务来运行。
    • 任务对象保存了协程运行后的状态,用于未来获取协程的结果。
    • 为什么要使用协程对象创建任务?
      因为在这个过程中 asyncio.Task 做了一些工作,包括预激协程、协程运行中遇到某些异常时的处理
  • 协程对象不能直接运行,必须放入事件循环中或者由 yield from 语句调用

  • 回调函数的最后一个参数是 future 或 task 对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以通过偏函数导入。

    1
    2
    3
    4
    5
    6
    def callback(name, task):
    print('[callback] Hello {}'.format(name))
    print('[callback] coroutine state: {}'.format(task._state))

    task = loop.create_task(coroutine)
    task.add_done_callback(functools.partial(callback, 'hyl'))

    其实多数情况下无需调用 task 的 add_done_callback 方法,可以直接把回调函数中的代码写入 await 语句后面

  • asyncio.sleep 与 time.sleep 是不同的

    • asyncio.sleep阻塞当前协程,即 corowork 函数的运行

      阻塞当前协程,CPU 可以在线程内的其它协程中执行

    • time.sleep会阻塞整个线程

  • 实际项目中,往往有多个协程创建多个任务对象,同时在一个 loop 里运行。
    为了把多个协程交给 loop,需要借助 asyncio.gather 方法。

    • asyncio.gather 方法中参数的顺序决定了协程的运行顺序(asyncio.wait也是按顺序执行)
    • 使用 asyncio.gather 方法,任务的 result 方法可以获得对应的协程函数的 return 值。

    多数情况下同样无需调用 task 的 result 方法获取协程函数的 return 值,因为事件循环的 run_until_complete 方法的返回值就是协程函数的 return 值

2.全流程解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
import asyncio

def four():
start = time.time()
async def corowork(name, t):
print('[corowork] Start coroutine', name)
await asyncio.sleep(t)
print('[corowork] Stop coroutine', name)
return 'Coroutine {} OK'.format(name)

loop = asyncio.get_event_loop()
coroutine1 = corowork('ONE', 3)
coroutine2 = corowork('TWO', 1)

task1 = loop.create_task(coroutine1)
task2 = loop.create_task(coroutine2)

gather = asyncio.gather(task1, task2)
loop.run_until_complete(gather)

print('[task1] ', task1.result())
print('[task2] ', task2.result())

end = time.time()
print('运行耗时:{:.4f}'.format(end - start))

four()
# [corowork] Start coroutine ONE
# [corowork] Start coroutine TWO
# [corowork] Stop coroutine TWO
# [corowork] Stop coroutine ONE
# [task1] Coroutine ONE OK
# [task2] Coroutine TWO OK
# 运行耗时:3.0175
  1. 首先运行 task1
  2. 打印 [corowork] Start coroutine ONE
  3. 遇到 asyncio.sleep 阻塞
  4. 释放 CPU 转到 task2 中执行
  5. 打印 [corowork] Start coroutine TWO
  6. 再次遇到 asyncio.sleep 阻塞
  7. 这次没有其它协程可以运行了,只能等阻塞结束
  8. task2 的阻塞时间较短,阻塞 1 秒后先结束,打印 [corowork] Stop coroutine TWO
  9. 又过了 2 秒,阻塞 3 秒的 task1 也结束了阻塞,打印 [corowork] Stop coroutine ONE
  10. 至此两个任务全部完成,事件循环停止
  11. 打印两个任务的 result
  12. 打印程序运行时间
  13. 程序全部结束

3.是否要关闭事件循环

事件循环有一个 stop 方法用来停止循环和一个 close 方法用来关闭循环。以上示例中都没有调用 loop.close 方法,似乎并没有什么问题。所以到底要不要调用 loop.close 呢?

简单来说,loop 只要不关闭,就还可以再次运行 run_until_complete 方法,关闭后则不可运行。有人会建议调用 loop.close,彻底清理 loop 对象防止误用,其实多数情况下根本没有这个必要。

4.asyncio.gather 和 asyncio.wait 的异同

  • 两者作用相同,都是将协程任务按顺序排定,再将返回值作为参数加入到事件循环中。

  • asyncio.wait 可以获取任务的执行状态(PENING & FINISHED),当有一些特别的需求例如在某些情况下取消任务,可以使用 asyncio.wait 方法。

5.取消任务的两种方法

  • loop.stop() : 事件循环的 stop 方法取消所有任务,停止事件循环
  • 任务的 cancel 方法也可以取消任务,而 asyncio.Task.all_tasks 方法可以获得事件循环中的全部任务

loop.stop

PENDING 状态的任务才能被取消,FINISHED 状态的任务已经完成,不能取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio

async def work(id, t):
print('Wroking...')
await asyncio.sleep(t)
print('Work {} done'.format(id))

def main():
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
# 事件循环的 stop 方法取消所有任务,停止事件循环
loop.stop()
finally:
loop.close()

if __name__ == '__main__':
main()

# Wroking...
# Wroking...
# Wroking...
# Work 1 done
# ^C%
  1. 首先,id 为 1 的协程先启动运行
  2. 打印 Working…
  3. 遇到 IO 阻塞,释放 CPU ,CPU 去到 id 为 2 的协程中运行
  4. 同样首先打印 Working…
  5. 遇到 IO 阻塞,同样释放 CPU ,第三个协程开始运行,打印 Working…
  6. 以上步骤瞬间完成,这时候的 loop 中全部协程处于阻塞状态
  7. 一秒钟后,id 为 1 的协程结束阻塞
  8. 打印 Work 1 done
  9. 然后手动按下快捷键 Ctrl + C ,触发 KeyboardInterrupt 异常
  10. try except 语句捕获异常,执行 # 3 和 # 4
  11. 程序运行完毕

task.cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio

async def work(id, t):
print('Wroking...')
await asyncio.sleep(t)
print('Work {} done'.format(id))

def main():
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
print()
# 每个线程里只能有一个事件循环
# 此方法可以获得事件循环中的所有任务的集合
# 任务的状态有 PENDING 和 FINISHED 两种
tasks = asyncio.Task.all_tasks()
for i in tasks:
print('取消任务:{}'.format(i))
# 任务的 cancel 方法可以取消未完成的任务
# 取消成功返回 True ,已完成的任务取消失败返回 False
print('取消状态:{}'.format(i.cancel()))
finally:
loop.close()


main()
# Wroking...
# Wroking...
# Wroking...

# 取消任务:<Task pending coro=<work() running at training1.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000026ED0CA3BE8>()]> cb=[gather.<locals>._done_callback() at F:\Anaconda3\lib\asyncio\tasks.py:660]>
# 取消状态:True
# 取消任务:<Task pending coro=<work() running at training1.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000026ED0CA3B88>()]> cb=[gather.<locals>._done_callback() at F:\Anaconda3\lib\asyncio\tasks.py:660]>
# 取消状态:True
# 取消任务:<Task pending coro=<work() running at training1.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000026ED0CA3C48>()]> cb=[gather.<locals>._done_callback() at F:\Anaconda3\lib\asyncio\tasks.py:660]>
# 取消状态:True

6.loop.run_forever 无限循环

排定任务

排定 task / future 在事件循环中的执行顺序,也就是对应的协程先执行哪个,遇到 IO 阻塞时,CPU 转而运行哪个任务,这是我们在进行异步编程时的一个需求。

前文所示的多任务程序中,事件循环里的任务的执行顺序由 asyncio.ensure_future / loop.create_task 和 asyncio.gather 排定。

run_forever 方法为无限运行事件循环,需要自定义 loop.stop 方法并执行之才会停止。

有两种执行loop.stop的方法:

  1. 单任务事件循环,将 loop 作为参数传入协程函数创建协程,在协程内部执行 loop.stop 方法停止事件循环。
  2. 多任务事件循环,使用回调函数执行 loop.stop 停止事件循环

在协程内部执行loop.stop方法停止事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def work(loop, t):
print('start')
await asyncio.sleep(t)
print('after {}s stop'.format(t))
loop.stop() # 停止事件循环,stop 后仍可重新运行

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(work(loop, 1))
loop.run_forever() # 无限运行事件循环,直至 loop.stop 停止
loop.close() # 关闭事件循环,只有 loop 处于停止状态才会执行

# start
# after 1s stop

使用回调函数执行 loop.stop 停止事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
import asyncio
import functools

def loop_stop(loop, future): # 函数的最后一个参数须为 future / task
loop.stop() # 停止事件循环,stop 后仍可重新运行

async def work(t): # 协程函数
print('start')
await asyncio.sleep(t) # 模拟 IO 操作
print('after {}s stop'.format(t))

def main():
loop = asyncio.get_event_loop()
# 创建任务收集器,参数为任意数量的协程,任务收集器本身也是 task / future 对象
tasks = asyncio.gather(work(1), work(2))
# 任务收集器的 add_done_callback 方法添加回调函数
# 当所有任务完成后,自动运行此回调函数
# 注意 add_done_callback 方法的参数是回调函数
# 这里使用 functools.partial 方法创建偏函数以便将 loop 作为参数加入
tasks.add_done_callback(functools.partial(loop_stop, loop))
loop.run_forever() # 无限运行事件循环,直至 loop.stop 停止
loop.close() # 关闭事件循环

if __name__ == '__main__':
start = time.time()
main()
end = time.time()
print('耗时:{:.4f}s'.format(end - start))

7.将普通函数作为任务注入事件循环的三种方法

  • loop.call_soon
  • loop.call_later
  • loop.call_at & loop.time

这三个 call_xxx 方法的作用都是将普通函数作为任务排定到事件循环中,返回值都是 asyncio.events.TimerHandle 实例

注意:它们不是协程任务 ,不能作为 loop.run_until_complete 的参数。

loop.call_soon

事件循环的 call_soon 方法可以将普通函数作为任务加入到事件循环并立即排定任务的执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import time

def hello(name): # 普通函数
print('[hello] Hello, {}'.format(name))

async def work(t, name): # 协程函数
print('[work ] start', name)
await asyncio.sleep(t)
print('[work ] {} after {}s stop'.format(name, t))

def main():
loop = asyncio.get_event_loop()
# 向事件循环中添加任务
asyncio.ensure_future(work(1, 'A')) # 第 1 个执行
# call_soon 将普通函数当作 task 加入到事件循环并排定执行顺序
# 该方法的第一个参数为普通函数名字,普通函数的参数写在后面
loop.call_soon(hello, 'Tom') # 第 2 个执行
# 向事件循环中添加任务
loop.create_task(work(2, 'B')) # 第 3 个执行
# 阻塞启动事件循环,顺便再添加一个任务
loop.run_until_complete(work(3, 'C')) # 第 4 个执行

if __name__ == '__main__':
main()

# [work ] start A
# [hello] Hello, Tom
# [work ] start B
# [work ] start C
# [work ] A after 1s stop
# [work ] B after 2s stop
# [work ] C after 3s stop

loop.call_later

此方法同 loop.call_soon 一样,可将普通函数作为任务放到事件循环里,不同之处在于此方法可延时执行,第一个参数为延时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import functools

def hello(name): # 普通函数
print('[hello] Hello, {}'.format(name))

async def work(t, name): # 协程函数
print('[work{}] start'.format(name))
await asyncio.sleep(t)
print('[work{}] stop'.format(name))

def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(work(1, 'A')) # 任务 1
loop.call_later(1.2, hello, 'Tom') # 任务 2
loop.call_soon(hello, 'Kitty') # 任务 3
task4 = loop.create_task(work(2, 'B')) # 任务 4
loop.call_later(1, hello, 'Jerry') # 任务 5
loop.run_until_complete(task4)

if __name__ == '__main__':
main()
# [workA] start
# [hello] Hello, Kitty
# [workB] start
# [hello] Hello, Jerry
# [workA] stop
# [hello] Hello, Tom
# [workB] stop

毫无疑问,这五个任务在一个事件循环里是顺序执行,遇到阻塞执行下一个,程序执行顺序如下:

  1. 首先执行任务一,打印一行后阻塞 1 秒,执行任务二

  2. 任务二是 call_later 1.2 秒,就相当于一个 1.2 秒的 asyncio.sleep

    ==注意,call_later 这个延时 1.2 秒是事件循环启动时就开始计时的
    (所以任务二阻塞,执行任务三)==

  3. 因为任务二阻塞,执行任务三,

  4. 接着执行任务四,打印一行后阻塞 2 秒

  5. 接着执行任务五,因为 call_later 1 秒,发生阻塞,第一轮结束.

以上是五个任务第一轮的执行情况,
第二轮开始前,此时所有的任务(任务1,任务2,任务4.任务5)都在阻塞中,所以CPU 一直候着

  1. 第一个发出执行信号的是任务五,它只阻塞 1 秒
    (上面已经说了,这个 1 秒是从事件循环启动时开始算,所以这个阻塞肯定比任务一的阻塞 1 秒先结束)

  2. CPU 执行完任务五,任务一也阻塞结束了,执行任务一

  3. 然后是任务二,最后是任务四

  4. 第二轮打印了 4 行,全部任务完成,停止事件循环

总结 :

  1. 第一轮 : 任务1 - 任务3 - 任务4
  2. 第二轮 : 任务5 - 任务1 - 任务2 - 任务4

我们可以总结事件循环的执行顺序:

  • 首先按顺序执行各个任务,一旦任务阻塞,就跳转到下一个任务,直到第一轮结束
  • 接下来,看哪个任务准备好了,就执行哪个任务
    (如果都准备好了就执行前面的任务)

loop.call_at & loop.time

  • call_soon 立刻执行
  • call_later 延时执行
  • call_at 在某时刻执行

loop.time 就是事件循环内部的一个计时方法,返回值是时刻,数据类型是 float

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import functools

def hello(name): # 普通函数
print('[hello] Hello, {}'.format(name))

async def work(t, name): # 协程函数
print('[work{}] start'.format(name))
await asyncio.sleep(t)
print('[work{}] stop'.format(name))


def main():
loop = asyncio.get_event_loop()
start = loop.time() # 事件循环内部时刻
asyncio.ensure_future(work(1, 'A')) # 任务 1

# loop.call_later(1.2, hello, 'Tom')
# 上面注释这行等同于下面这行
loop.call_at(start+1.2, hello, 'Tom') # 任务 2

loop.call_soon(hello, 'Kitty') # 任务 3

task4 = loop.create_task(work(2, 'B')) # 任务 4
# loop.call_later(1, hello, 'Jerry')
# 上面注释这行等同于下面这行
loop.call_at(start+1, hello, 'Jerry') # 任务 5

loop.run_until_complete(task4)

main()
# [workA] start
# [hello] Hello, Kitty
# [workB] start
# [hello] Hello, Jerry
# [workA] stop
# [hello] Hello, Tom
# [workB] stop

8.协程的通讯方法

asyncio.lock

协程锁:
asyncio.lock 应该叫做异步 IO 锁,之所以叫协程锁,是因为它通常使用在子协程中,其作用是将协程内部的一段代码锁住,直到这段代码运行完毕解锁。

协程锁的固定用法是使用 async with 创建协程锁的上下文环境,将代码块写入其中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio

l = []
lock = asyncio.Lock() # 协程锁

async def work(name):
print(r'\\\\\\\\\\\\',name) # 打印此信息是为了测试协程锁的控制范围
# 这里加个锁,第一次调用该协程,运行到这个语句块,上锁
# 当语句块结束后解锁,开锁前该语句块不可被运行第二次

# 如果上锁后有其它任务调用了这个协程函数,运行到这步会被阻塞,直至解锁
# with 是普通上下文管理器关键字,async with 是异步上下文管理器关键字

# 能够使用 with 关键字的对象须有 __enter__ 和 __exit__ 方法
# 能够使用 async with 关键字的对象须有 __aenter__ 和 __aexit__ 方法

# async with 会自动运行 lock 的 __aenter__ 方法,该方法会调用 acquire 方法上锁
# 在语句块结束时自动运行 __aexit__ 方法,该方法会调用 release 方法解锁

# 这和 with 一样,都是简化 try ... finally 语句
async with lock:
print('{} start'.format(name)) # 头一次运行该协程时打印
if 'x' in l: # 如果判断成功
return name # 直接返回结束协程,不再向下执行
print('++++++++++',name)
await asyncio.sleep(0) # asyncio.sleep(0) 一样也会切换协程
print('----------',name)
l.append('x')
print('{} end'.format(name))
return name

async def one():
name = await work('one')
print('{} ok'.format(name))

async def two():
print('///////////')
name = await work('two')
print('{} ok'.format(name))

def main():
loop = asyncio.get_event_loop()
tasks = asyncio.wait([one(), two()])
loop.run_until_complete(tasks)

if __name__ == '__main__':
main()

# \\\\\\\\\\\\ one
# one start
# ++++++++++ one
# ///////////
# \\\\\\\\\\\\ two
# ---------- one
# one end
# one ok
# two start
# two ok

执行步骤解析:

  1. 协程one当执行到++++++++++ one后执行await asyncio.sleep(0),此时lock并没有解开,就切换协程two了
  2. 协程two开始执行,打印///////////
  3. 协程two执行完\\\\\\\\\\\\ two,后执行async with lock,但是此时lock并没有解开.切换到协程one
  4. 协程one恢复执行---------- one.当执行完return name后lock解开.
  5. 协程one执行完one ok,协程one执行完毕,开始执行协程two
  6. 执行协程two

示例二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import asyncio
import functools


def unlock(lock):
print('callback releasing lock')
lock.release()

async def coro1(lock):
print('coro1 wating for the lock')
with await lock:
print('coro1 acquired lock')
print('coro1 released lock')

async def coro2(lock):
print('coro2 wating for the lock')
await lock
try:
print('coro2 acquired lock')
finally:
print('coro2 released lock')
lock.release()

async def main(loop):
lock = asyncio.Lock()
print('acquiring the lock before starting coroutines')
await lock.acquire()
print('lock acquired: {}'.format(lock.locked()))

loop.call_later(0.1, functools.partial(unlock, lock))

print('waiting for coroutines')
await asyncio.wait([coro1(lock), coro2(lock)])

event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

# acquiring the lock before starting coroutines
# lock acquired: True
# waiting for coroutines
# coro1 wating for the lock
# coro2 wating for the lock
# callback releasing lock
# coro1 acquired lock
# coro1 released lock
# coro2 acquired lock
# coro2 released lock

asyncio.Event

asyncio.Event 类似 threading.Event 用来允许多个消费者等待某个事情发生,不用通过监听一个特殊的值的来说实现类似通知的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import functools


def set_event(event):
print('setting event in callback')
event.set()


async def coro1(event):
print('coro1 waiting for event')
await event.wait()
print('coro1 triggered')


async def coro2(event):
print('coro2 waiting for event')
await event.wait()
print('coro2 triggered')


async def main(loop):
event = asyncio.Event()
print('event start state: {}'.format(event.is_set()))
loop.call_later(
0.1, functools.partial(set_event, event)
)
await asyncio.wait([coro1(event), coro2(event)])
print('event end state: {}'.format(event.is_set()))


event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

# event start state: False
# coro1 waiting for event
# coro2 waiting for event
# setting event in callback
# coro1 triggered
# coro2 triggered
# event end state: True

Lock 一样,coro1()coro2() 都在等待 event 被设置。

不同的是它们都在 event 状态一发生变化的时候就启动了,它们不需要对 event 对象获取一个唯一的所有权。

简单来说 : event 能唤醒所有等待中的 coroutine

asyncio.condition

Condition 的效果类似 Event,不同的是它不是唤醒所有等待中的 coroutine, 而是通过 notify() 唤醒指定数量的待唤醒 coroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import asyncio


async def consumer(condition, n):
with await condition:
print('consumer {} is waiting'.format(n))
await condition.wait()
print('consumer {} triggered'.format(n))
print('ending consumer {}'.format(n))


async def manipulate_condition(condition):
print('starting manipulate_condition')

await asyncio.sleep(0.1)

for i in range(1, 3):
with await condition:
print('notifying {} consumers'.format(i))
condition.notify(n=i)
await asyncio.sleep(0.1)

with await condition:
print('notifying remaining consumers')
condition.notify_all()

print('ending manipulate_condition')


async def main(loop):
condition = asyncio.Condition()

consumers = [
consumer(condition, i)
for i in range(5)
]
loop.create_task(manipulate_condition(condition))

await asyncio.wait(consumers)


event_loop = asyncio.get_event_loop()
try:
result = event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

# starting manipulate_condition
# consumer 0 is waiting
# consumer 4 is waiting
# consumer 1 is waiting
# consumer 2 is waiting
# consumer 3 is waiting
# notifying 1 consumers
# consumer 0 triggered
# ending consumer 0
# notifying 2 consumers
# consumer 4 triggered
# ending consumer 4
# consumer 1 triggered
# ending consumer 1
# notifying remaining consumers
# ending manipulate_condition
# consumer 2 triggered
# ending consumer 2
# consumer 3 triggered
# ending consumer 3
  • 启动了五个 Condition 的消费者,每个都使用 wait() 方法来等待它们可以继续处理的通知。
  • manipulate_condition() 首先通知了一个消费者,然后又通知了两个消费者,最后通知剩下的所有消费者。

在线程中使用condition需要注意:
必须每个线程先获取conditioncond.acquire(),最后必须释放cond.release()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import threading, time

class Hider(threading.Thread):
def __init__(self, cond, name):
super(Hider, self).__init__()
self.cond = cond
self.name = name

def run(self):
time.sleep(1) #确保先运行Seeker中的方法
self.cond.acquire()

print(self.name + ': 我已经把眼睛蒙上了')
self.cond.notify()
self.cond.wait()
print(self.name + ': 我找到你了哦 ~_~')
self.cond.notify()

self.cond.release()
print(self.name + ': 我赢了')

class Seeker(threading.Thread):
def __init__(self, cond, name):
super(Seeker, self).__init__()
self.cond = cond
self.name = name

def run(self):
self.cond.acquire()
self.cond.wait()
print(self.name + ': 我已经藏好了,你快来找我吧')
self.cond.notify()
self.cond.wait()
self.cond.release()
print(self.name + ': 被你找到了,哎~~~')

cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

# hider: 我已经把眼睛蒙上了
# seeker: 我已经藏好了,你快来找我吧
# hider: 我找到你了哦 ~_~
# hider: 我赢了
# seeker: 被你找到了,哎~~~

在协程里也一样,每个协程函数都必须先获取condition,最后释放.这里我们使用了with自动获取和释放

1
2
3
with await condition:
print('consumer {} is waiting'.format(n))
await condition.wait()

注意:
为了更明显设置了新特性:
with await要改为async with:

1
2
3
4
5
with await condition:
print('consumer {} is waiting'.format(n))

async with condition:
print('consumer {} is waiting'.format(n))

asyncio.Queue

asyncio.Queue 为 coroutines 实现了一个先进先出的数据结构,类似多线程中的 queue.Queue ,多进程中的 multiprocessing.Queue

queue.Queue的queue.task_done()的功能 :
每task_done一次 就从队列里删掉一个元素,这样在最后queue.join()的时候根据队列长度是否为零来判断队列是否结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio


async def consumer(n, q):
print('consumer {}: waiting for item'.format(n))
while True:
print('consumer {}: waiting for item'.format(n))
item = await q.get()
print('consumer {}: has item {}'.format(n, item))
# 在这个程序中 None 是个特殊的值,表示终止信号
if item is None:
q.task_done()
break
else:
await asyncio.sleep(0.01 * item)
q.task_done()

print('consumer {}: ending'.format(n))


async def producer(q, num_workers):
print('producer: starting')
# 向队列中添加一些数据
for i in range(num_workers * 3):
# 因为Queue长度为2,所以,当第三次的时候,就会await了(前两次不会)
await q.put(i)
print('producer: added task {} to the queue'.format(i))

# 通过 None 这个特殊值来通知消费者退出
print('producer: adding stop signals to the queue')
for i in range(num_workers):
await q.put(None)
print('producer: waiting for queue to empty')
await q.join()
print('producer: ending')


async def main(loop, num_consumers):
# 创建指定大小的队列,这样的话生产者将会阻塞
# 直到有消费者获取数据
q = asyncio.Queue(maxsize=num_consumers)

# 调度消费者
consumers = [
loop.create_task(consumer(i, q))
for i in range(num_consumers)
]

# 调度生产者
prod = loop.create_task(producer(q, num_consumers))

# 等待所有 coroutines 都完成
await asyncio.wait(consumers + [prod])


event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop, 2))
finally:
event_loop.close()


# consumer 0: waiting for item
# consumer 0: waiting for item
# consumer 1: waiting for item
# consumer 1: waiting for item
# producer: starting
# producer: added task 0 to the queue
# producer: added task 1 to the queue
# consumer 0: has item 0
# consumer 1: has item 1
# producer: added task 2 to the queue
# producer: added task 3 to the queue
# consumer 0: waiting for item
# consumer 0: has item 2
# producer: added task 4 to the queue
# consumer 1: waiting for item
# consumer 1: has item 3
# producer: added task 5 to the queue
# producer: adding stop signals to the queue
# consumer 0: waiting for item
# consumer 0: has item 4
# consumer 1: waiting for item
# consumer 1: has item 5
# producer: waiting for queue to empty
# consumer 0: waiting for item
# consumer 0: has item None
# consumer 0: ending
# consumer 1: waiting for item
# consumer 1: has item None
# consumer 1: ending
# producer: ending

9.获取协程返回值的方法

一个协程中await了另外一个协程

1
2
3
4
5
6
7
8
9
10
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

dones, pendings = await asyncio.wait(tasks)

for task in dones:
print('Task ret: ', task.result())

asyncio.gather创建协程对象

如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。

1
2
3
4
5
6
7
8
9
10
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

results = await asyncio.gather(*tasks)

for result in results:
print('Task ret: ', result)

run_until_complete返回协程的结果

在函数里return await asyncio.gather(*tasks)后就可以使用results = loop.run_until_complete(main())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

return await asyncio.gather(*tasks)


loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())

for result in results:
print('Task ret: ', result)

也可以使用return await asyncio.wait(tasks):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(main())

for task in done:
print('Task ret: ', task.result())

asyncio的as_completed方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task # 注意这里有一个await


loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())

as_completed最常使用的形式:

1
2
for task in asyncio.as_completed(tasks):
result = await task

注意result = await task中有一个await

10.asyncio的futrue对象

Future 对象表示一个还未完成的工作,事件循环可以监视 Future 对象的状态直至它变成 done,这将运行程序的一部分等待另一部分完成一些工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio

def mark_done(future, result):
print('setting future result to {!r}'.format(result))
# 为future对象设置值
future.set_result(result)


event_loop = asyncio.get_event_loop()
all_done = asyncio.Future()
try:
print('scheduling make_done')
event_loop.call_soon(mark_done, all_done, 'the result')
print('entering event loop')
# await all_done这个future对象返回结果
result = event_loop.run_until_complete(all_done)
print('returned result: {!r}'.format(result))
finally:
print('closing event loop')
event_loop.close()

print('future result: {!r}'.format(all_done.result()))

# scheduling make_done
# entering event loop
# setting future result to 'the result'
# returned result: 'the result'
# closing event loop
# future result: 'the result'

注意:
event_loop.run_until_complete(all_done)的意思是将all_done放入事件循环,然后开始时间循环的意思.(也就是说,这个事件循环可能存在已经注册了的task)
简单来说就是 : ==启动事件循环,顺便再添加一个任务==

1
2
3
4
5
6
7
8
9
10
11
def main():
loop = asyncio.get_event_loop()
# 向事件循环中添加任务
asyncio.ensure_future(work(1, 'A')) # 第 1 个执行
# call_soon 将普通函数当作 task 加入到事件循环并排定执行顺序
# 该方法的第一个参数为普通函数名字,普通函数的参数写在后面
loop.call_soon(hello, 'Tom') # 第 2 个执行
# 向事件循环中添加任务
loop.create_task(work(2, 'B')) # 第 3 个执行
# 阻塞启动事件循环,顺便再添加一个任务
loop.run_until_complete(work(3, 'C')) # 第 4 个执行

当调用 set_result 方法后,Future 对象的状态会被修改为 done, 同时 Future 实例也会保存设置的结果值,供随后使用

11.使用抽象类 Protocol 实现异步 I/O

服务端

  • EchoServer继承 asyncio.Protocol ,用来处理与客户端的通信。

    protocol 的方法是基于服务端 socket 事件来触发的。

  • 每当有一个新的客户端连接的时候,就会触发调用 connection_made() 方法。

    1
    2
    3
    4
    5
    6
    7
    def connection_made(self, transport):
    self.transport = transport
    self.address = transport.get_extra_info('peername')
    self.log = logging.getLogger(
    'EchoServer_{}_{}'.format(*self.address)
    )
    self.log.debug('connection accepted')
    • transport 参数是一个 asyncio.Transport 实例对象,这个对象抽象了一系列使用 socket 进行异步 I/O 操作的方法。

    • 不同的通信协议提供了不同的 transport 实现,但是它们都有同样的 API.

      (比如,有一些 transport 类用来与 socket 通信,有些用来跟子进程通过管道通信)

    • 可以通过 get_extra_info() 获取进来的客户端的地址信息。

  • 连接建立以后,当有数据从客户端发到服务端的时候会使用传输过来的数据调用 data_received() 方法。
    这里我们记录一下收到的数据,然后立即发收到的数据通过 transport.write() 发回客户端。

    1
    2
    3
    4
    def data_received(self, data):
    self.log.debug('received {!r}'.format(data))
    self.transport.write(data)
    self.log.debug('sent {!r}'.format(data))
  • 一些 transport 支持一个特殊的 end-of-file 标识符(EOF)。当遇到一个 EOF 的时候,eof_received() 方法会被调用。
    在本次实现中,EOF 会被发送会客户端,表示这个信号已经被收到。因为不是所有的 transport 都支持这个 EOF ,这个协议会首先询问 transport 是否可以安全的发送 EOF .

    1
    2
    3
    4
    def eof_received(self):
    self.log.debug('received EOF')
    if self.transport.can_write_eof():
    self.transport.write_eof()
  • 当一个连接被关闭的时候,无论是正常关闭还是因为一个错误导致的关闭,协议的 connection_lost() 方法都会被调用,如果是因为出错,参数中会包含一个相关的异常对象,否则这个对象就是 None.

    1
    2
    3
    4
    5
    6
    def connection_lost(self, error):
    if error:
    self.log.error('ERROR: {}'.format(error))
    else:
    self.log.debug('closing')
    super().connection_lost(error)
  • 需要两步来启动这个服务器。

    1. 首先,应用告诉事件循环创建使用 protocol 类和 hostname 以及 socket 监听的端口信息来创建一个新的 server 对象。

      create_server() 方法是一个 coroutine, 所以它的结果必须通过事件循环来处理这样才能真正的启动服务器。

      这个 coroutine 完成的时候会返回一个 与事件循环相关联的 asyncio.Server 实例.

      1
      2
      3
      factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)
      server = event_loop.run_until_complete(factory)
      log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))
    2. 然后这个事件循环需要被运行,以便接收客户端请求以及处理相关事件。

      对于一个长时间运行的服务器程序来说, run_forever() 方法是最简便的实现这个功能的方法。

      当事件循环被停止的时候,无论是通过应用程序代码还是通过进程信号停止的,server 都可以被关闭以便能够正确的清理 socket 资源

      1
      2
      3
      4
      5
      6
      7
      8
      try:
      event_loop.run_forever()
      finally:
      log.debug('closing server')
      server.close()
      event_loop.run_until_complete(server.wait_closed())
      log.debug('closing event loop')
      event_loop.close()

客户端

  • 使用 protocol 类实现一个客户端的代码跟实现一个服务器端非常的相似.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import asyncio
    import functools
    import logging
    import sys

    MESSAGES = [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
    ]
    SERVER_ADDRESS = ('localhost', 10000)

    logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
    )
    log = logging.getLogger('main')

    event_loop = asyncio.get_event_loop()
  • 客户端 protocol 类定义了跟服务器端相同的方法,但是是不同的实现。

    future 参数是一个 Future 实例,用来作为客户端已经完成了一次接收来自服务端数据操作的信号。

    1
    2
    3
    4
    5
    6
    class EchoClient(asyncio.Protocol):
    def __init__(self, messages, future):
    super().__init__()
    self.messages = messages
    self.log = logging.getLogger('EchoClient')
    self.f = future
  • 当客户端成功连接到服务器时,会立即开始通信。客户端一次发送了一堆数据,因为网络等原因可能会把多个消息合并到一个消息中。当所有消息都送达的时候,将发送一个 EOF

    虽然看起你所有的数据都立即被发送了,事实上 transport 对象会缓冲发出去的数据并且会设置一个回调来传输最终的数据,当 socket 的缓冲区准备好可以发送的时候会调用这个回调。这些都是由 transport 来实现的,所以应用代码可以按照 I/O 操作就像看起来那么发生的样子来实现.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    def connection_made(self, transport):
    self.transport = transport
    self.address = transport.get_extra_info('peername')
    self.log.debug(
    'connectiong to {} port {}'.format(*self.address)
    )
    # 也可以使用 transport.writelines()
    # 这里使用 transport.write() 是为了方便
    # 记录发送的每一行内容
    for msg in self.messages:
    transport.write(msg)
    self.log.debug('sending {!r}'.format(msg))

    if transport.can_write_eof():
    transport.write_eof()
  • 当接收到来着服务器端的响应时,将会把这个响应记录下来

    1
    2
    def data_received(self, data):
    self.log.debug('received {!r}'.format(data))
  • 无论是收到 end-of-file 标记还是服务器端断开了连接,本地 transport 对象都将关闭并且 future 对象都会被通过设置一个结果值的方式标记为已完成。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def eof_received(self):
    self.log.debug('received EOF')
    self.transport.close()
    if not self.f.done():
    self.f.set_result(True)

    def connnection_lost(self, exc):
    self.log.debug('server closed connection')
    self.transport.close()
    if not self.f.done():
    self.f.set_result(True)
    super().connectiong_lost(exc)
  • 然后创建所需的 future, 以及客户端 coroutine

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    client_completed = asyncio.Future()
    client_factory = functools.partial(
    EchoClient,
    messages=MESSAGES,
    future=client_completed
    )
    factory_coroutine = event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
    )
  • 然后使用两次 wait 来处理客户端发送完成并退出的操作

    1
    2
    3
    4
    5
    6
    7
    og.debug('waiting for client to complete')
    try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
    finally:
    log.debug('closing event loop')
    event_loop.close()

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# asyncio_echo_server_protocol.py
import asyncio
import logging
import sys


SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()


# EchoServer用来处理与客户端的通信。
# protocol 的方法是基于服务端 socket 事件来触发的。
class EchoServer(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log = logging.getLogger(
'EchoServer_{}_{}'.format(*self.address)
)
self.log.debug('connection accepted')

def data_received(self, data):
self.log.debug('received {!r}'.format(data))
self.transport.write(data)
self.log.debug('sent {!r}'.format(data))

def eof_received(self):
self.log.debug('received EOF')
if self.transport.can_write_eof():
self.transport.write_eof()

def connection_lost(self, error):
if error:
self.log.error('ERROR: {}'.format(error))
else:
self.log.debug('closing')
super().connection_lost(error)


factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

try:
event_loop.run_forever()
finally:
log.debug('closing server')
server.close()
event_loop.run_until_complete(server.wait_closed())
log.debug('closing event loop')
event_loop.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# asyncio_echo_client_protocol.py
import asyncio
import functools
import logging
import sys

MESSAGES = [
b'This is the message. ',
b'It will be sent ',
b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()


class EchoClient(asyncio.Protocol):
def __init__(self, messages, future):
super().__init__()
self.messages = messages
self.log = logging.getLogger('EchoClient')
self.f = future

def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log.debug(
'connectiong to {} port {}'.format(*self.address)
)
# 也可以使用 transport.writelines()
# 这里使用 transport.write() 是为了方便
# 记录发送的每一行内容
for msg in self.messages:
transport.write(msg)
self.log.debug('sending {!r}'.format(msg))

if transport.can_write_eof():
transport.write_eof()

def data_received(self, data):
self.log.debug('received {!r}'.format(data))

def eof_received(self):
self.log.debug('received EOF')
self.transport.close()
if not self.f.done():
self.f.set_result(True)

def connnection_lost(self, exc):
self.log.debug('server closed connection')
self.transport.close()
if not self.f.done():
self.f.set_result(True)
super().connectiong_lost(exc)


client_completed = asyncio.Future()
client_factory = functools.partial(
EchoClient,
messages=MESSAGES,
future=client_completed
)
factory_coroutine = event_loop.create_connection(
client_factory,
*SERVER_ADDRESS,
)

log.debug('waiting for client to complete')
try:
event_loop.run_until_complete(factory_coroutine)
event_loop.run_until_complete(client_completed)
finally:
log.debug('closing event loop')
event_loop.close()