进程与线程

使用多线程

继承Thread类

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
from threading import Thread


class thread1(Thread):
def run(self):
while True:
print('------------')
time.sleep(1)


for each in range(5):
thread1().start()

直接使用Thread类

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
from threading import Thread


def func(num):
while True:
print(num)
num += 1
time.sleep(1)


t = Thread(target=func,args=(1,))
t.start()

使用ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
import threading
from concurrent.futures import ThreadPoolExecutor


def func(num):
print('---',num)
time.sleep(1)


if __name__ == '__main__':
with ThreadPoolExecutor(3) as e:
e.map(func,range(10))

使用多进程

使用Process类

1
2
3
4
5
6
7
8
9
10
import time
from multiprocessing import Process

def func(num):
for n in range(num):
print(n)
time.sleep(1)

for each in range(10):
Process(target=func,args=(each,)).start()

使用Pool

注意 : 必须将它放在if __name__ == '__main__':

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
from multiprocessing import Pool


def func(num):
for n in range(num):
print(n)
time.sleep(1)



if __name__ == '__main__':
p = Pool(10)
for n in range(10):
p.apply_async(func,args=(n,))

p.close()
p.join()

使用Queue

注意同样要使用if __name__ == '__main__':

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
from multiprocessing import Process,Queue


def read(q):
while True:
res = q.get()
print(res)
time.sleep(1)

def write(q):
num = 0
while True:
num += 1
q.put(num)
time.sleep(1)

if __name__ == '__main__':
q = Queue()
r = Process(target=read,args=(q,))
w = Process(target=write,args=(q,))

w.start()
r.start()

time.sleep(5)
# 强制结束
r.terminate()
w.terminate()
print('强制结束')

线程限制

编写一个资源抢占的多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from threading import Thread

def func():
global num
for _ in range(1000000):
num += 1


if __name__ == '__main__':
num = 0

t1 = Thread(target=func)
t2 = Thread(target=func)

t1.start()
t2.start()

t1.join()
t2.join()
print(num)

使用Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
from threading import Thread,Lock


def run1():
global num
for _ in range(1000000):
with lock:
num += 1


if __name__ == '__main__':
num = 0

lock = Lock()
t1 = Thread(target=run1)
t2 = Thread(target=run1)

t1.start()
t2.start()
t1.join()
t2.join()
print(num)

使用RLock

可重入锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
from threading import Thread,RLock


def fun():
global num
for _ in range(10000):
with lock:
num += 1
with lock:
print(num)


if __name__ == '__main__':
lock = RLock()
num = 0

Thread(target=fun).start()

使用ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from threading import Thread,local


def fun(num):
local.num = num
for _ in range(1000000):
local.num += 1
print('-----',local.num)


if __name__ == '__main__':
local = local()
num = 0

t1 = Thread(target=fun,args=(num,))
t2 = Thread(target=fun,args=(num,))

t1.start()
t2.start()

t1.join()
t2.join()
print('-----',num)

使用Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from threading import Thread,Semaphore


def fun(num):
while True:
num += 1
with sem:
print('--------',num)
time.sleep(2)


if __name__ == '__main__':
sem = Semaphore(3)
num = 0

for i in range(9):
Thread(target=fun,args=(i,)).start()

time.sleep(15)
print('*******',num)

使用Barrier

凑够一定数量的线程才能执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from threading import Thread,Barrier


def fun(num):
bar.wait()
print('--------',num)
time.sleep(2)


if __name__ == '__main__':
bar = Barrier(3)
num = 0

for i in range(9):
time.sleep(1)
Thread(target=fun,args=(i,)).start()


time.sleep(15)
print('*******',num)

使用Timer

定时线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
from threading import Thread,Timer


def fun():
print('--------')
time.sleep(2)


if __name__ == '__main__':
t = Timer(5,fun)

t.start()
t.join()
print('*******')

线程通讯

使用Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
from threading import Thread,Event


def fun():
while True:
event.wait()
print('--------')
event.clear()


if __name__ == '__main__':
event = Event()

t = Thread(target=fun)
t.start()

for _ in range(5):
time.sleep(1)
event.set()

print('********')

wait – set – clear

对应着阻塞解除阻塞重新阻塞整个流程

使用Condition

用于线程之间互相唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
from threading import Thread,Condition


def fun1():
while True:
with cond:
cond.wait()
print('--------')
time.sleep(1)
cond.notify()


def fun2():
while True:
with cond:
cond.notify()
print('*******')
time.sleep(1)
cond.wait()



if __name__ == '__main__':
cond = Condition()

t1 = Thread(target=fun1)
t2 = Thread(target=fun2)

t1.start()
t2.start()

多任务的最佳实践

设计Master-Worker模式,Master负责分配任务,Worker负责执行任务

  • 主进程就是Master,其他进程就是Worker
  • 优点:稳定性高:
    一个子进程崩溃了,不会影响主进程和其他子进程,当然主进程挂掉了,所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低
  • 缺点:
    1. 创建进程的代价大:
      在Unix/Linux系统下,用fork调用还行,在Window创建进程开销巨大。
    2. 操作系统能同时运行的进程数有限:
      在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题

协程

  • 不能在生成器中使用await
  • 不能在 async 定义的协程中使用yield
  • 协程对象不能直接运行,必须放入事件循环中或者由 yield from 语句调用

yield from的两大优势

  1. 避免嵌套循环
  2. 转移控制权

asyncio的组件

  • event_loop 事件循环:
    程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
  • coroutine 协程:
    协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • task 任务:
    一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  • future:
    代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
    (task对象是Future类的子类)
  • async/await 关键字:
    python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

传入loop的方法

直接传入协程

1
2
3
4
5
async def read():pass
async def write():pass

loop = asyncio.get_event_loop()
loop.run_until_complete(write())

使用asyncio.wait传入协程列表

1
2
3
4
5
6
7
8
async def coro1():pass
async def coro2():pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2(),
]))

使用asyncio.gather传入协程

1
2
3
4
5
6
group1 = asyncio.gather(*[func("A" ,i) for i in range(1, 3)])
group2 = asyncio.gather(*[func("B", i) for i in range(1, 5)])
group3 = asyncio.gather(*[func("B", i) for i in range(1, 7)])

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(group1, group2, group3))

task

创建task的两种方法

使用create_task

1
2
async def do_some_work(x):pass
task = loop.create_task(do_some_work(2))

使用ensure_future

1
2
async def do_some_work(x):pass
task = asyncio.ensure_future(coroutine)

task绑定回调

1
2
3
4
5
async def coroutine(x):pass
def callback(future):pass

task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)

task获取结果

使用task.result()

1
2
3
4
async def coroutine(x):pass
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
print(task.result())

使用asyncio.wait(tasks)

1
2
3
4
5
6
7
8
9
10
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

dones, pendings = await asyncio.wait(tasks)

for task in dones:
print('Task ret: ', task.result())

asyncio.get_event_looloop.run_until_completep配合asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

return await asyncio.gather(*tasks)


loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())

for result in results:
print('Task ret: ', result)

loop.run_until_complete配合asyncio.wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(main())

for task in done:
print('Task ret: ', task.result())

asyncio的as_completed方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task # 注意这里有一个await


loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())

动态注册协程

同步

当前线程创建一个事件循环,然后再新建一个线程,在新线程中启动事件循环。当前线程不会被block。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time
from threading import Thread

def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))

start = time.time()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

# TIME: 0.0019989013671875
# More work 6
# Finished more work 6
# More work 3
# Finished more work 3

异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import time
import asyncio
from queue import Queue
from threading import Thread

def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()

async def do_sleep(x, queue, msg=""):
await asyncio.sleep(x)
queue.put(msg)

if __name__ == '__main__':
queue = Queue()

new_loop = asyncio.new_event_loop()

# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 动态添加两个协程
# 这种方法,在主线程是异步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)

while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())

# Sat Jul 13 10:39:27 2019
# 第二个 协程运行完..
# Sat Jul 13 10:39:30 2019
# 第一个 协程运行完..
# Sat Jul 13 10:39:33 2019

async with 和async for

async with

异步上下文管理器指的是在enter和exit方法处能够暂停执行的上下文管理器

  • 实现这样的功能,需要加入两个新的方法:__aenter__ 和__aexit__。
  • 这两个方法都要返回一个 awaitable 类型的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async with EXPR as VAR:
BLOCK

## 上面代码等同于下面:
mgr = (EXPR)
aenter = type(mgr).__aenter__(mgr)
aexit = type(mgr).__aexit__
exc = True

VAR = await aenter
try:
BLOCK
except:
if not await aexit(mgr, *sys.exc_info()):
raise
else:
await aexit(mgr, None, None, None)

简单来说就是:

  • 进来的时候需要await aenter(延时操作)
  • 出去的时候需要await aexit(延时操作)

async for

  • 异步迭代器就是能够在next方法中调用异步代码
  • 一个异步可迭代对象(asynchronous iterable)能够在迭代过程中调用异步代码

为了支持异步迭代:

  1. 一个对象必须实现__aiter__方法,该方法返回一个异步迭代器(asynchronous iterator)对象。
  2. 一个异步迭代器对象必须实现__anext__方法,该方法返回一个awaitable 类型的值。
  3. 为了停止迭代,__anext__必须抛出一个 StopAsyncIteration 异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async for TARGET in ITER:
BLOCK
else:
BLOCK2


## 上面代码等同于下面:
iter = (ITER)
iter = type(iter).__aiter__(iter)
running = True
while running:
try:
TARGET = await type(iter).__anext__(iter)
except StopAsyncIteration:
running = False
else:
BLOCK
else:
BLOCK2

loop.stop暂停事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def work(loop, t):
print('start')
await asyncio.sleep(t)
print('after {}s stop'.format(t))
loop.stop() # 停止事件循环,stop 后仍可重新运行

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(work(loop, 1))
loop.run_forever() # 无限运行事件循环,直至 loop.stop 停止
loop.close() # 关闭事件循环,只有 loop 处于停止状态才会执行

# start
# after 1s stop

将普通函数作为任务注入事件循环的三种方法

  • loop.call_soon
  • loop.call_later
  • loop.call_at & loop.time

loop.call_soon

将普通函数作为任务加入到事件循环并立即排定任务的执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import time

def hello(name): # 普通函数
print('[hello] Hello, {}'.format(name))

async def work(t, name): # 协程函数
print('[work ] start', name)
await asyncio.sleep(t)
print('[work ] {} after {}s stop'.format(name, t))

def main():
loop = asyncio.get_event_loop()
# 向事件循环中添加任务
asyncio.ensure_future(work(1, 'A')) # 第 1 个执行
# call_soon 将普通函数当作 task 加入到事件循环并排定执行顺序
# 该方法的第一个参数为普通函数名字,普通函数的参数写在后面
loop.call_soon(hello, 'Tom') # 第 2 个执行
# 向事件循环中添加任务
loop.create_task(work(2, 'B')) # 第 3 个执行
# 阻塞启动事件循环,顺便再添加一个任务
loop.run_until_complete(work(3, 'C')) # 第 4 个执行

if __name__ == '__main__':
main()

# [work ] start A
# [hello] Hello, Tom
# [work ] start B
# [work ] start C
# [work ] A after 1s stop
# [work ] B after 2s stop
# [work ] C after 3s stop

loop.call_later

可延时执行,第一个参数为延时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import functools

def hello(name): # 普通函数
print('[hello] Hello, {}'.format(name))

async def work(t, name): # 协程函数
print('[work{}] start'.format(name))
await asyncio.sleep(t)
print('[work{}] stop'.format(name))

def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(work(1, 'A')) # 任务 1
loop.call_later(1.2, hello, 'Tom') # 任务 2
loop.call_soon(hello, 'Kitty') # 任务 3
task4 = loop.create_task(work(2, 'B')) # 任务 4
loop.call_later(1, hello, 'Jerry') # 任务 5
loop.run_until_complete(task4)

if __name__ == '__main__':
main()
# [workA] start
# [hello] Hello, Kitty
# [workB] start
# [hello] Hello, Jerry
# [workA] stop
# [hello] Hello, Tom
# [workB] stop

注意,call_later 这个延时 1.2 秒是事件循环启动时就开始计时的

loop.call_at & loop.time

  • call_at 在某时刻执行
  • loop.time 就是事件循环内部的一个计时方法,返回值是时刻,数据类型是 float
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import functools

def hello(name): # 普通函数
print('[hello] Hello, {}'.format(name))

async def work(t, name): # 协程函数
print('[work{}] start'.format(name))
await asyncio.sleep(t)
print('[work{}] stop'.format(name))


def main():
loop = asyncio.get_event_loop()
start = loop.time() # 事件循环内部时刻
asyncio.ensure_future(work(1, 'A')) # 任务 1

# loop.call_later(1.2, hello, 'Tom')
# 上面注释这行等同于下面这行
loop.call_at(start+1.2, hello, 'Tom') # 任务 2

loop.call_soon(hello, 'Kitty') # 任务 3

task4 = loop.create_task(work(2, 'B')) # 任务 4
# loop.call_later(1, hello, 'Jerry')
# 上面注释这行等同于下面这行
loop.call_at(start+1, hello, 'Jerry') # 任务 5

loop.run_until_complete(task4)

main()
# [workA] start
# [hello] Hello, Kitty
# [workB] start
# [hello] Hello, Jerry
# [workA] stop
# [hello] Hello, Tom
# [workB] stop

协程的通讯方法

asyncio.lock

协程锁存在的意义 :

  • 协程锁锁住了一些代码A,在执行代码A的时候,有可能就切换协程了,此时的协程锁就会锁住
  • 这样就保证了协程锁里面的代码一定会等待被执行完毕.

协程锁的固定用法是使用 async with 创建协程锁的上下文环境,将代码块写入其中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio

l = []
lock = asyncio.Lock() # 协程锁

async def work(name):
print(r'\\\\\\\\\\\\',name) # 打印此信息是为了测试协程锁的控制范围
# 这里加个锁,第一次调用该协程,运行到这个语句块,上锁
# 当语句块结束后解锁,开锁前该语句块不可被运行第二次

# 如果上锁后有其它任务调用了这个协程函数,运行到这步会被阻塞,直至解锁
# with 是普通上下文管理器关键字,async with 是异步上下文管理器关键字

# 能够使用 with 关键字的对象须有 __enter__ 和 __exit__ 方法
# 能够使用 async with 关键字的对象须有 __aenter__ 和 __aexit__ 方法

# async with 会自动运行 lock 的 __aenter__ 方法,该方法会调用 acquire 方法上锁
# 在语句块结束时自动运行 __aexit__ 方法,该方法会调用 release 方法解锁

# 这和 with 一样,都是简化 try ... finally 语句
async with lock:
print('{} start'.format(name)) # 头一次运行该协程时打印
if 'x' in l: # 如果判断成功
return name # 直接返回结束协程,不再向下执行
print('++++++++++',name)
await asyncio.sleep(0) # asyncio.sleep(0) 一样也会切换协程
print('----------',name)
l.append('x')
print('{} end'.format(name))
return name

async def one():
name = await work('one')
print('{} ok'.format(name))

async def two():
print('///////////')
name = await work('two')
print('{} ok'.format(name))

def main():
loop = asyncio.get_event_loop()
tasks = asyncio.wait([one(), two()])
loop.run_until_complete(tasks)

if __name__ == '__main__':
main()

# \\\\\\\\\\\\ one
# one start
# ++++++++++ one
# ///////////
# \\\\\\\\\\\\ two
# ---------- one
# one end
# one ok
# two start
# two ok

asyncio.Event

asyncio.Event 类似 threading.Event 用来允许多个消费者等待某个事情发生,不用通过监听一个特殊的值的来说实现类似通知的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import functools


def set_event(event):
print('setting event in callback')
event.set()


async def coro1(event):
print('coro1 waiting for event')
await event.wait()
print('coro1 triggered')


async def coro2(event):
print('coro2 waiting for event')
await event.wait()
print('coro2 triggered')


async def main(loop):
event = asyncio.Event()
print('event start state: {}'.format(event.is_set()))
loop.call_later(
0.1, functools.partial(set_event, event)
)
await asyncio.wait([coro1(event), coro2(event)])
print('event end state: {}'.format(event.is_set()))


event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

# event start state: False
# coro1 waiting for event
# coro2 waiting for event
# setting event in callback
# coro1 triggered
# coro2 triggered
# event end state: True

asyncio.condition

Condition 的效果类似 Event,不同的是它不是唤醒所有等待中的 coroutine, 而是通过 notify() 唤醒指定数量的待唤醒 coroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio


async def consumer(condition, n):
print('----')
async with condition:
print('consumer {} is waiting'.format(n))
await condition.wait()
print('consumer {} triggered'.format(n))
print('ending consumer {}'.format(n))


async def manipulate_condition(condition):
print('starting manipulate_condition')

await asyncio.sleep(0.1)

for i in range(1, 3):
async with condition:
print('notifying {} consumers'.format(i))
condition.notify(n=i)
await asyncio.sleep(0.1)

async with condition:
print('notifying remaining consumers')
condition.notify_all()

print('ending manipulate_condition')


async def main(loop):
condition = asyncio.Condition()

consumers = [
consumer(condition, i)
for i in range(5)
]
loop.create_task(manipulate_condition(condition))

await asyncio.wait(consumers)


event_loop = asyncio.get_event_loop()
try:
result = event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

# starting manipulate_condition
# ----
# consumer 3 is waiting
# ----
# consumer 0 is waiting
# ----
# consumer 2 is waiting
# ----
# consumer 4 is waiting
# ----
# consumer 1 is waiting
# notifying 1 consumers
# consumer 3 triggered
# ending consumer 3
# notifying 2 consumers
# consumer 0 triggered
# ending consumer 0
# consumer 2 triggered
# ending consumer 2
# notifying remaining consumers
# ending manipulate_condition
# consumer 4 triggered
# ending consumer 4
# consumer 1 triggered
# ending consumer 1

asyncio.Queue

queue.Queue的queue.task_done()的功能 :
task_done一次 就从队列里删掉一个元素,这样在最后queue.join()的时候根据队列长度是否为零来判断队列是否结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio


async def consumer(n, q):
print('consumer {}: waiting for item'.format(n))
while True:
print('consumer {}: waiting for item'.format(n))
item = await q.get()
print('consumer {}: has item {}'.format(n, item))
# 在这个程序中 None 是个特殊的值,表示终止信号
if item is None:
q.task_done()
break
else:
await asyncio.sleep(0.01 * item)
q.task_done()

print('consumer {}: ending'.format(n))


async def producer(q, num_workers):
print('producer: starting')
# 向队列中添加一些数据
for i in range(num_workers * 3):
# 因为Queue长度为2,所以,当第三次的时候,就会await了(前两次不会)
await q.put(i)
print('producer: added task {} to the queue'.format(i))

# 通过 None 这个特殊值来通知消费者退出
print('producer: adding stop signals to the queue')
for i in range(num_workers):
await q.put(None)
print('producer: waiting for queue to empty')
await q.join()
print('producer: ending')


async def main(loop, num_consumers):
# 创建指定大小的队列,这样的话生产者将会阻塞
# 直到有消费者获取数据
q = asyncio.Queue(maxsize=num_consumers)

# 调度消费者
consumers = [
loop.create_task(consumer(i, q))
for i in range(num_consumers)
]

# 调度生产者
prod = loop.create_task(producer(q, num_consumers))

# 等待所有 coroutines 都完成
await asyncio.wait(consumers + [prod])


event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop, 2))
finally:
event_loop.close()


# consumer 0: waiting for item
# consumer 0: waiting for item
# consumer 1: waiting for item
# consumer 1: waiting for item
# producer: starting
# producer: added task 0 to the queue
# producer: added task 1 to the queue
# consumer 0: has item 0
# consumer 1: has item 1
# producer: added task 2 to the queue
# producer: added task 3 to the queue
# consumer 0: waiting for item
# consumer 0: has item 2
# producer: added task 4 to the queue
# consumer 1: waiting for item
# consumer 1: has item 3
# producer: added task 5 to the queue
# producer: adding stop signals to the queue
# consumer 0: waiting for item
# consumer 0: has item 4
# consumer 1: waiting for item
# consumer 1: has item 5
# producer: waiting for queue to empty
# consumer 0: waiting for item
# consumer 0: has item None
# consumer 0: ending
# consumer 1: waiting for item
# consumer 1: has item None
# consumer 1: ending
# producer: ending