通过线程以动画的形式显示文本式旋转指针:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import threading import itertools import time import sys
class Signal: go = True
def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\'): status = char + ' ' + msg write(status) flush() time.sleep(.1) write('\x08' * len(status)) if not signal.go: break write(' ' * len(status) + '\x08' * len(status))
def slow_function(): time.sleep(3) return 42
def supervisor(): signal = Signal() spinner = threading.Thread(target=spin, args=('thinking!', signal)) print('spinner object:', spinner) spinner.start() result = slow_function() signal.go = False spinner.join() return result
def main(): result = supervisor() print('Answer:', result)
if __name__ == '__main__': main()
|
python没有提供终止线程的API,==若想关闭线程,就必须结束线程对应的target函数==.这里使用的是signal.go属性:在主线程中把它设置为False后,spinner线程会干净的退出.
asynico.Future与concurrent.futures
asynico.Future类
与concurrent.futures类
的接口基本一致,但是实现方式不同,不可以互换
asynico.Future类
的result
方法没有参数,因此不能指定超时时间.
如果调用result方法时future还没有运行完,
- concurrent.futures的result方法会阻塞等待结果.
- asynico.Future的result方法会直接抛出asyncio.InvalidStateError异常.
从future,任务和协程中产出
可以使用yield from从asyncio.Futures对象中产出结果.
也就是说,对于foo协程函数,我们可以使用res = yileld from foo()
为了执行上面操作,必须排定协程的运行时间,然后使用asyncio.Task对象包装协程
asyncio.async(coro_or_future,*,loop=None)
这个函数统一了协程和future.
- 第一个参数可以是协程或者future.如果是Future或者Task对象,那么就原封不动的返回.
- 如果是协程,那么async函数胡会调用
loop.create_task()
方法创建Task对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import asyncio import time
async def do_some_work(x): print('Waiting: ', x)
await asyncio.sleep(x) return 'Done after {}s'.format(x)
async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)
tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]
dones, pendings = await asyncio.wait(tasks)
for task in dones: print('Task ret: ', task.result())
if __name__ == '__main__': start = time.time()
loop = asyncio.get_event_loop() loop.run_until_complete(main())
print('TIME: ', time.time() - start)
|
dones, pendings = await asyncio.wait(tasks)
中的asyncio.wait()
是一个协程,等待传给他的所有协程运行完毕后结束
dones, pendings = asyncio.wait(tasks)
一般而言,pendings为空.
wait有两个关键字参数timeout
和return_when
.如果设置了这两个参数,pendings就有可能不为空.
如果asyncio不理解,可以假装async和await关键字不存在,这样,代码就从异步变成同步了
yield from foo句法能防止阻塞的原因:
当前协程(即包含yield from代码的委派生成器)暂停后,控制权回到了时间循环中,再去驱动其他的协程,foo future或协程运行完毕后,吧结构返回给暂停的协程,将其恢复
使用Executor对象,防止阻塞事件循环
用于保存文件的函数异步要执行异步操作,但是asyncio包目前没有提供异步文件系统API.但是我们可以使用循环对象的run_in_executor
方法.这个方法让函数在背后维护着一个ThreadPoolExecutor
对象,这就是说,我们让函数在另一个进程里执行:
简单来说就是把阻塞作业交给线程池来做.
1 2 3 4 5 6 7 8 9
| import asyncio
def save_file(file_url): pass
file_url = 'xxx'
loop = asyncio.get_event_loop() loop.run_inexecutor(None,save_file,file_url)
|
loop.run_inexecutor()
- 第一个参数是Executor实例.如果设为None,使用事件循环默认的ThreadPoolExecutor实例
- 其余参数是可调用的对象,以及可调用对象的位置参数
从回调到future和协程
在异步编程中,回调函数抛出的错误很难捕获.一般的操作就是为每个异步调用注册两个回调:
- 一个用户处理操作成功时返回的结果
- 一个用户处理错误
Python中的回调地狱:链式回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def stage1(response1): request2 = step1(response1) api_call2(request2, stage2)
def stage2(response2): request3 = step2(response2) api_call3(request3, stage3)
def stage3(response3): step3(response3)
api_call1(request1, stage1)
|
使用协程和 yield from 结构做异步编程,无需使用回调
1 2 3 4 5 6 7 8 9 10 11 12 13
| @asyncio.coroutine def three_stages(request1): response1 = yield from api_call1(request1) request2 = step1(response1) response2 = yield from api_call2(request2) request3 = step2(response2) response3 = yield from api_call3(request3) step3(response3)
loop.create_task(three_stages(request1))
|