通过线程以动画的形式显示文本式旋转指针:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# BEGIN SPINNER_THREAD
import threading
import itertools
import time
import sys

class Signal: # <1>
go = True

def spin(msg, signal): # <2>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'): # <3>
status = char + ' ' + msg
write(status)
flush()
time.sleep(.1)
write('\x08' * len(status)) # <4>
if not signal.go:
break
write(' ' * len(status) + '\x08' * len(status)) # <6>


def slow_function(): # <7>
# pretend waiting a long time for I/O
time.sleep(3) # <8>
return 42


def supervisor(): # <9>
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner) # <10>
spinner.start() # <11>
result = slow_function() # <12>
signal.go = False # <13>
spinner.join() # <14>
return result


def main():
result = supervisor() # <15>
print('Answer:', result)


if __name__ == '__main__':
main()
# END SPINNER_THREAD

python没有提供终止线程的API,==若想关闭线程,就必须结束线程对应的target函数==.这里使用的是signal.go属性:在主线程中把它设置为False后,spinner线程会干净的退出.

asynico.Future与concurrent.futures

asynico.Future类concurrent.futures类的接口基本一致,但是实现方式不同,不可以互换

asynico.Future类result方法没有参数,因此不能指定超时时间.

如果调用result方法时future还没有运行完,

  • concurrent.futures的result方法会阻塞等待结果.
  • asynico.Future的result方法会直接抛出asyncio.InvalidStateError异常.

从future,任务和协程中产出

可以使用yield from从asyncio.Futures对象中产出结果.

也就是说,对于foo协程函数,我们可以使用res = yileld from foo()

为了执行上面操作,必须排定协程的运行时间,然后使用asyncio.Task对象包装协程

asyncio.async(coro_or_future,*,loop=None)

这个函数统一了协程和future.

  • 第一个参数可以是协程或者future.如果是Future或者Task对象,那么就原封不动的返回.
  • 如果是协程,那么async函数胡会调用loop.create_task()方法创建Task对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import time

async def do_some_work(x):
print('Waiting: ', x)

await asyncio.sleep(x)
return 'Done after {}s'.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

dones, pendings = await asyncio.wait(tasks)

for task in dones:
print('Task ret: ', task.result())


if __name__ == '__main__':
start = time.time()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print('TIME: ', time.time() - start)
# Waiting: 1
# Waiting: 2
# Waiting: 4
# Task ret: Done after 1s
# Task ret: Done after 2s
# Task ret: Done after 4s
# TIME: 4.001962423324585
  • dones, pendings = await asyncio.wait(tasks)中的asyncio.wait()是一个协程,等待传给他的所有协程运行完毕后结束
  • dones, pendings = asyncio.wait(tasks)一般而言,pendings为空.
    wait有两个关键字参数timeoutreturn_when.如果设置了这两个参数,pendings就有可能不为空.

如果asyncio不理解,可以假装async和await关键字不存在,这样,代码就从异步变成同步了

yield from foo句法能防止阻塞的原因:
当前协程(即包含yield from代码的委派生成器)暂停后,控制权回到了时间循环中,再去驱动其他的协程,foo future或协程运行完毕后,吧结构返回给暂停的协程,将其恢复

使用Executor对象,防止阻塞事件循环

用于保存文件的函数异步要执行异步操作,但是asyncio包目前没有提供异步文件系统API.但是我们可以使用循环对象的run_in_executor方法.这个方法让函数在背后维护着一个ThreadPoolExecutor对象,这就是说,我们让函数在另一个进程里执行:

简单来说就是把阻塞作业交给线程池来做.

1
2
3
4
5
6
7
8
9
import asyncio

def save_file(file_url):
pass

file_url = 'xxx'

loop = asyncio.get_event_loop()
loop.run_inexecutor(None,save_file,file_url)

loop.run_inexecutor()

  • 第一个参数是Executor实例.如果设为None,使用事件循环默认的ThreadPoolExecutor实例
  • 其余参数是可调用的对象,以及可调用对象的位置参数

从回调到future和协程

在异步编程中,回调函数抛出的错误很难捕获.一般的操作就是为每个异步调用注册两个回调:

  • 一个用户处理操作成功时返回的结果
  • 一个用户处理错误

Python中的回调地狱:链式回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def stage1(response1):
request2 = step1(response1)
api_call2(request2, stage2)


def stage2(response2):
request3 = step2(response2)
api_call3(request3, stage3)


def stage3(response3):
step3(response3)


api_call1(request1, stage1)

使用协程和 yield from 结构做异步编程,无需使用回调

1
2
3
4
5
6
7
8
9
10
11
12
13
@asyncio.coroutine
def three_stages(request1):
response1 = yield from api_call1(request1)
# 第一步
request2 = step1(response1)
response2 = yield from api_call2(request2)
# 第二步
request3 = step2(response2)
response3 = yield from api_call3(request3)
# 第三步
step3(response3)

loop.create_task(three_stages(request1)) # 必须显式调度执行