网址
实验楼_python异步编程
Python异步IO之协程
Python黑魔法 — 异步IO
asyncio 学习笔记
async with 和 async for
1.GIL 线程资源共用,所以容易造成预想不到的效果,也就是说线程是不安全的. 如何解决线程安全问题?
CPython解释器使用了加锁的方法。每个进程有一把锁,启动线程先加锁,结束线程释放锁 。
打个比方,进程是一个厂房,厂房大门是开着的,门内有锁,工人进入大门后可以在内部上锁。厂房里面有10个车间对应10个线程,每个CPU就是一个工人。
进程 – 厂房
线程 – 车间
CPU – 工人
GIL(Global Interpreter Lock)全局锁就相当于厂房规定:工人要到车间工作,从厂房大门进去后要在里面反锁,完成工作后开锁出门,下一个工人再进门上锁。也就是说,任意时刻厂房里只能有一个工人,但这样就保证了工作的安全性,这就是GIL的原理。
当然了,GIL的存在有很多其它益处,包括简化 CPython 解释器和大量扩展的实现。
2.异步 所谓的异步,就是CPU在当前线程阻塞时可以去其它线程中工作,不管怎么设计,在一个线程内部代码都是顺序执行的 ,遇到IO都得阻塞,所谓的非阻塞,是遇到当前线程阻塞时,CPU去其它线程工作
异步:多任务,==多个任务之间执行没有先后顺序==,可以同时运行,执行的先后顺序不会有什么影响,存在的多条运行主线
同步:多任务,==多个任务之间执行的时候要求有先后顺序==,必须一个先执行完成之后,另一个才能继续执行,只有一个主线
阻塞:从调用者的角度出发,如果在调用的时候,被卡住,不能再继续向下运行,需要等待,就说是阻塞。
非阻塞:从调用者的角度出发,如果在调用的时候,没有被卡住,能够继续向下运行,无需等待,就说是非阻塞。
同异步与阻塞的区别:
同步和异步关注的是消息通信机制;
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。
3.协程 协程是在线程的基础上编写,由程序员决定代码执行顺序、可以互相影响的高耦合度代码的一种高级程序设计模式。
生成器函数的执行结果是生成器,注意这里所讲的“执行结果”不是函数的 return值。生成器终止时必定抛出 Stoplteration异常,for循环可以捕获此异常,异常的 value 属性值为生成器函数的 return值。
生成器会在yield语句处暂停,这是至关重要的,未来协程中的IO阻塞就出现在这里.
协程有四种存在状态:
GEN_CREATED 创建完成,等待执行
GEN _RUNNING 解释器正在执行(这个状态在下面的示例程序中无法看到)
GEN_SUSPENDED 在yield表达式处暂停
GEN_CLOSE 执行结束,生成器停止
预先激活生成器(或协程)可以使用next方法,也可以使用生成器的send方法发送None值:g.send(None)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import inspectfrom functools import wrapsdef deco (func ): @wraps(func ) def wrapper (*args,**kw ): g = func(*args,**kw) next (g) return g return wrapper @deco def gen (): i = 'hyl' while True : try : value = yield i except ValueError: print ('over' ) i = value g = gen() print (inspect.getgeneratorstate(g))
前文提到Stoplteration异常的value属性值为生成器(协程)函数的 return 值 ,我们可以在使用协程时捕获异常并得到这个值。举例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import inspectfrom functools import wrapsdef deco (func ): @wraps(func ) def wrapper (*args,**kw ): g = func(*args,**kw) next (g) return g return wrapper @deco def gen (): i = [] while True : value = yield if value == 'close' : break i.append(value) return i g = gen() g.send('hello' ) g.send('123' ) g.send('close' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import inspectfrom functools import wrapsdef deco (func ): @wraps(func ) def wrapper (*args,**kw ): g = func(*args,**kw) next (g) return g return wrapper @deco def gen (): i = [] while True : value = yield if value == 'close' : break i.append(value) return i g = gen() g.send('hello' ) g.send('123' ) try : g.send('close' ) except StopIteration as e: print (e.value)
4.生成器注意事项
当一个生成器对象被销毁时,或者主程序结束的时候会抛出一个GeneratorExit异常。
GeneratorExit异常的产生意味着生成器对象的生命周期已经结束。因此,一旦产生了GeneratorExit异常,生成器方法后续执行的语句中,不能再有yield语句,否则会产生RuntimeError。
throw:用来向生成器函数送入一个异常,如果生成器处理了这个异常,代码会向前执行到下一个yield,产生的值成为调用throw方法的返回值。
close方法会在生成器对象方法的挂起处抛出一个GeneratorExit异常 。
5.yield from yield from的两大优势
避免嵌套循环
转移控制权
yield from语句可以替代for循环,避免了嵌套循环.同yield一样,有 yield from语句的函数叫做协程函数或生成器函数.
yield from后面接收一个可迭代对象,在协程中,==可迭代对象往往是协程对象,这样就形成了嵌套协程==.
1 2 3 4 5 6 def func (): yield from range (1 ,10 ) yield from [1 ,2 ,3 ] for each in func(): print (each)
yield from允许子生成器直接从调用者接收其发送的信息或者抛出调用时遇到的异常,并且返回给委派生产器一个值。
所谓转移控制权
就是 yield from语法可以==将子生成器的控制权交给调用方main函数==,在main函数内部创建父生成器C,控制 c.send方法传值给子生成器 .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 from functools import wrapsimport timefrom faker import Fakerdef coroutine (func ): @wraps(func ) def wrapper (*args, **kw ): g = func(*args, **kw) next (g) return g return wrapper def sub_coro (): l = [] while True : value = yield if value == 'CLOSE' : break l.append(value) return sorted (l) @coroutine def dele_coro (): while True : l = yield from sub_coro() print ('排序后的列表:' , l) print ('------------------' ) def main (): fake = Faker().country_code nest_country_list = [[fake() for i in range (3 )] for j in range (3 )] for country_list in nest_country_list: print ('国家代号列表:' , country_list) c = dele_coro() for country in country_list: c.send(country) c.send('CLOSE' ) if __name__ == '__main__' : main()
注意:
yield from 会自动预激子 生成器
父生成器的l = yield from sub_coro()
中的l
就会接受子生成器的return值. 也就是说,==l只有在子生成器结束的时候才会被赋值==.
父生成器就是一个通道而已,在上面代码中,通道没有做任何额外的事情
使用通道:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def coroutine (func ): def wrapper (*args, **kw ): g = func(*args, **kw) next (g) return g return wrapper def add (): res = 0 while True : i = yield if i == 'close' : break else : res += i return res @coroutine def parent (): while True : a = yield from add() return a def main (): gen = parent() gen.send(1 ) gen.send(2 ) gen.send(3 ) gen.send(4 ) x = gen.send('close' ) print (x) try : main() except StopIteration as e: print (e)
不使用通道:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def coroutine (func ): def wrapper (*args, **kw ): g = func(*args, **kw) next (g) return g return wrapper @coroutine def add (): res = 0 while True : i = yield if i == 'close' : break else : res += i return res def main (): gen = add() gen.send(1 ) gen.send(2 ) gen.send(3 ) gen.send(4 ) x = gen.send('close' ) print (x) try : main() except StopIteration as e: print (e)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def coroutine (func ): def wrapper (*args, **kw ): g = func(*args, **kw) next (g) return g return wrapper def sort_list (): res = [] while True : i = yield if i != 'close' : res.append(i) else : break return sorted (res) @coroutine def parent (): while True : a = yield from sort_list() return a def main (): g = parent() g.send(1 ) g.send(5 ) g.send(59 ) g.send(7 ) g.send('close' ) try : main() except StopIteration as e: print (e)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def avg (): tot = 0 count = 0 while True : a = yield if a != None : tot += a count += 1 else : return tot/count def proxies (aDict,key ): while True : aDict[key] = yield from avg() def main (): data = {'hyl' :[95 ,96 ,97 ],'dsz' :[88 ,90 ]} res = {} for person,score in data.items(): gen = proxies(res,person) next (gen) for each in score: gen.send(each) gen.send(None ) print (res) if __name__ == '__main__' : main()
而yield from功能还不止于此,它还有一个主要的功能是省去了很多异常的处理 ,不再需要我们手动编写,其内部已经实现大部分异常处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def generator_1 (): total = 0 while True : x = yield print ('加' ,x) if not x: break total += x return total def generator_2 (): while True : total = yield from generator_1() print ('加和总数是:' ,total) def main (): g1 = generator_1() g1.send(None ) g1.send(2 ) g1.send(3 ) g1.send(None ) main()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def generator_1 (): total = 0 while True : x = yield print ('加' ,x) if not x: break total += x return total def generator_2 (): while True : total = yield from generator_1() print ('加和总数是:' ,total) def main (): g2 = generator_2() g2.send(None ) g2.send(2 ) g2.send(3 ) g2.send(None ) main()
可见yield from封装了处理常见异常的代码。对于g2即便传入None也不报异常,其中total = yield from generator_1()
返回给total的值是generator_1()最终的return total
概念:
子生成器:yield from后的generator_1()生成器函数是子生成器
委托生成器:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
调用方:main()是程序中的调用方,负责调用委托生成器。
yield from在其中的作用是:建立调用方和子生成器的通道
在上述代码中main()每一次在调用send(value)时,value不是传递给了委托生成器generator_2(),而是借助yield from传递给了子生成器generator_1()中的yield
同理,子生成器中的数据也是通过yield直接发送到调用方main()中。
6.线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import timefrom concurrent import futuresdef printer (num ): time.sleep(1 ) print (num) def main (): with futures.ThreadPoolExecutor(5 ) as executor: executor.map (printer,range (4 )) if __name__ == '__main__' : main()
注意:executor.map(printer,range(4))
是将range(4)迭代出来的元素作为printer的参数.并不是整个range(4)作为priner的参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import timefrom concurrent import futuresdef printer (num ): time.sleep(1 ) print (num) def main (): with futures.ThreadPoolExecutor(5 ) as executor: executor.map (printer,((1 ,2 ),(3 ,4 ),(5 ,6 ),(7 ,8 ),(9 ,10 ))) if __name__ == '__main__' : main()
executor.__exit__方法会调用executor.shutdown(wait=True)
方法,等待所有线程都执行完毕后关闭线程池
future 是一种对象,表示异步执行的操作
。Future封装待完成的操作
7.支持获取返回值的线程 使用threading自定义一个类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import timeimport randomimport threadingclass Test (threading.Thread ): def foo (self ): time.sleep(2 ) return random.randint(0 ,5 ) def run (self ): self._return = self.foo() def get_result (self ): return self._return test1 = Test() test2 = Test() test1.start() test2.start() test1.join() test2.join() print (test1.get_result())print (test2.get_result())
使用threaing指定一个更抽象的类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from threading import Threadimport timedef printer (num ): while True : time.sleep(1 ) print (num) class ThreadWithReturnValue (Thread ): def run (self ): if self._target: self._return = self._target(*self._args, **self._kwargs) def get_result (self ): return self._return test = ThreadWithReturnValue(target=printer,args=(2 ,)) test.start()
使用future.ThreadPoolExecutor的map 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import timefrom concurrent import futuresdef func (num ): print ('---hyl' ) time.sleep(2 ) return ('this is return' ,num) with futures.ThreadPoolExecutor(3 ) as executor: result = executor.map (func,range (6 )) print (result) for each in result: print (each)
发现(‘this is return’, 0)都是最后才打印的,说明: 使用with
语句就能起到join()
的阻塞作用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import timefrom concurrent import futuresdef func (num ): print ('---hyl' ) time.sleep(2 ) return ('this is return' ,num) executor = futures.ThreadPoolExecutor(3 ) result = executor.map (func,range (6 )) print ('*------------' )for each in result: print (each) time.sleep(20 ) print ('**************************' )
发现print('**************************')
是最后执行的,这说明for循环阻塞了
说明必须等到result这个生成器所有之填充完毕后才会退出for循环
总结:
with
能够阻塞,必须等待result = executor.map(func,range(6))
才能退出with语句
result
也能阻塞,必须等待result这个生成器有值才会向下执行代码
Executor.map还有个特性比较有用,那就是这个函数返回==结果的顺序于调用开始的顺序是一致的==。如果第一个调用生成结果用时10秒,其他调用只用1秒,代码会阻塞10秒,获取map方法返回的生成器产出的第一个结果。
守护进程
一般创建了子进程,主线程都会等子进程完全执行完,主进程才会结束
守护线程就是主线程不等子进程,主线程做完程序立马结束。
设置守护进程
1 2 3 p=Process(target=func) p.daemon=True
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from multiprocessing import Processimport timedef func (): print ('*****' ) time.sleep(10 ) print ('我还活着' ) def main (): p = Process(target=func) p.daemon = True p.start() if __name__ == '__main__' : print ('-----------' ) main() print ('end' )
还可以使用join():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from multiprocessing import Processimport timedef func (): print ('*****' ) time.sleep(10 ) print ('我还活着' ) if __name__ == '__main__' : print ('-----------' ) p = Process(target=func) p.daemon = True p.start() p.join() print ('end' )
守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
守护子进程和非守护子进程可以并存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from multiprocessing import Processfrom threading import Threadimport time,osdef foo (): print (123 ) time.sleep(1 ) print ("end123" ) def bar (): print (456 ) time.sleep(3 ) print ("end456" ) if __name__ == '__main__' : p1 = Process(target=foo) p2 = Process(target=bar) p1.daemon=True p1.start() p2.start() print ("main-------" )
守护线程
主进程在其代码结束后就已经算运行完毕了 (守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
主线程在其他非守护线程运行完毕后才算运行完毕 (守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from multiprocessing import Processfrom threading import Threadimport os,time,randomdef task (): for each in range (1000 ): print (each) print ('%s is done' %os.getpid()) if __name__ == '__main__' : t = Thread(target=task) t.daemon = True t.start() print ('主' )
守护子线程非守护子进程并存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from threading import Threadimport timedef foo (): print (123 ) time.sleep(100 ) print ("end123" ) def bar (): print (456 ) time.sleep(3 ) print ("end456" ) if __name__ == '__main__' : t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon=True t1.start() t2.start() print ("main-------" )
注意:
主线程在其他非守护线程运行完毕后才算运行完毕
可能发生其他非守护线程过晚结束,守护线程执行完毕 的现象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from threading import Threadimport timedef foo (): print (123 ) time.sleep(1 ) print ("end123" ) def bar (): print (456 ) time.sleep(10 ) print ("end456" ) if __name__ == '__main__' : t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon=True t1.start() t2.start() print ("main-------" )
8.asyncio模块 基础概念 @asyncio.coroutine
:asyncio模块中的装饰器,用于将一个生成器声明为协程。
yield from 其实就是等待另外一个协程的返回。
简单来说: @asyncio.coroutine装饰器是协程函数的标志,我们需要在每一个任务函数前加这个装饰器,并在函数中使用yield from。
==在协程中yield from后面必须是子生成器函数==
在Python 3.5开始引入了新的语法async和await,以简化并更好地标识异步IO。
使用async
代替@asyncio.coroutine
使用await
代替yield from
我们可以使用async修饰将普通函数和生成器函数包装成异步函数和异步生成器。
异步函数(协程)
1 2 async def async_function (): return 1
异步生成器
1 2 async def async_generator (): yield 1
所以,函数可以分成四种:
1 2 3 4 5 import typesprint (type (function) is types.FunctionType)print (type (generator()) is types.GeneratorType)print (type (async_function()) is types.CoroutineType)print (type (async_generator()) is types.AsyncGeneratorType)
在协程函数中,可以通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果:await后面的对象需要是一个Awaitable 。await的目的是等待协程控制流的返回 ,而实现暂停并挂起函数的操作是yield
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 async def async_function (): return 1 async def await_coroutine (): result = await async_function() print (result) def run (coroutine ): try : coroutine.send(None ) except StopIteration as e: return e.value run(await_coroutine())
event_loop 事件循环: 程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
coroutine 协程: 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务: 一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装 ,其中包含任务的各种状态。
future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别 (task对象是Future类的子类)
async/await 关键字: python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口 。
走个流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import timeimport asyncioasync def taskIO_1 (): print ('开始运行IO任务1...' ) await asyncio.sleep(2 ) print ('IO任务1已完成,耗时2s' ) return taskIO_1.__name__ async def taskIO_2 (): print ('开始运行IO任务2...' ) await asyncio.sleep(3 ) print ('IO任务2已完成,耗时3s' ) return taskIO_2.__name__ async def main (): tasks = [taskIO_1(), taskIO_2()] done,pending = await asyncio.wait(tasks) for r in done: print ('协程无序返回值:' +r.result()) if __name__ == '__main__' : start = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print ('所有IO任务总耗时%.5f秒' % float (time.time()-start))
使用asyncio.coroutine装饰器 Python3.3的yield from语法可以把生成器的操作委托给另一个生成器,生成器的调用方可以直接与子生成器进行通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def sub_gen (): yield 1 yield 2 yield 3 def gen (): return (yield from sub_gen()) def main (): for val in gen(): print (val)
利用这一特性,使用yield from能够编写出类似协程效果的函数调用,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from语法来创建协程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncio@asyncio.coroutine def compute (x, y ): print ("Compute %s + %s ..." % (x, y)) yield from asyncio.sleep(1.0 ) return x + y @asyncio.coroutine def print_sum (x, y ): result = yield from compute(x, y) print ("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1 , 2 )) loop.close()
然而,用yield from容易在表示协程和生成器中混淆,没有良好的语义性,所以在Python 3.5推出了更新的async/await表达式来作为协程的语法。
因此类似以下的调用是等价的:
1 2 3 4 5 async with lock: ... with (yield from lock): ...
1 2 3 4 5 def main (): return (yield from coro()) def main (): return (await coro())
初步使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asynciofuture = asyncio.Future() async def coro1 (): print ('--- coro1 ---' ) await asyncio.sleep(.1 ) future.set_result('data' ) async def coro2 (): print ('--- coro2 ---' ) print (await future) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([ coro1(), coro2(), ])) loop.close()
两个协程在在事件循环中,协程coro1在执行第一句后挂起自身切到asyncio.sleep,而协程coro2一直等待future的结果,让出事件循环,计时器结束后coro1执行了第二句设置了future的值,被挂起的coro2恢复执行,打印出future的结果’data’。
future可以被await证明了future对象是一个Awaitable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import timeimport asyncio@asyncio.coroutine def taskIO_1 (): print ('开始运行IO任务1...' ) yield from asyncio.sleep(2 ) print ('IO任务1已完成,耗时2s' ) return taskIO_1.__name__ @asyncio.coroutine def taskIO_2 (): print ('开始运行IO任务2...' ) yield from asyncio.sleep(3 ) print ('IO任务2已完成,耗时3s' ) return taskIO_2.__name__ @asyncio.coroutine def main (): tasks = [taskIO_1(), taskIO_2()] done,pending = yield from asyncio.wait(tasks) for r in done: print ('协程无序返回值:' +r.result()) if __name__ == '__main__' : start = time.time() loop = asyncio.get_event_loop() try : loop.run_until_complete(main()) finally : loop.close() print ('所有IO任务总耗时%.5f秒' % float (time.time()-start))
上面代码先通过get_event_loop()获取了一个标准事件循环loop(因为是一个,所以协程是单线程)
然后,我们通过run_until_complete(main())来运行协程(此处把调用方协程main()作为参数,调用方负责调用其他委托生成器),run_until_complete的特点就像该函数的名字,直到循环事件的所有事件都处理完才能完整结束。
进入调用方协程,我们把多个任务[taskIO_1()和taskIO_2()]放到一个task列表中,可理解为打包任务。
使用==asyncio.wait(tasks)来获取一个awaitable objects==,即可等待对象的集合 (此处的aws是协程的列表),并发运行传入的aws,同时通过yield from返回一个包含(done, pending)的元组,done表示已完成的任务列表,pending表示未完成的任务列表;
如果使asyncio.as_completed(tasks)则会按完成顺序生成协程的迭代器(常用于for循环中),因此当你用它迭代时,会尽快得到每个可用的结果。
此外,当轮询到某个事件时(如taskIO_1()),直到遇到该任务中的yield from中断,开始处理下一个事件(如taskIO_2())),当yield from后面的子生成器完成任务时,该事件才再次被唤醒
因为done里面有我们需要的返回结果,但它目前还是个任务列表,所以要取出返回的结果值,我们遍历它并逐个调用result()取出结果即可。
(注:对于asyncio.wait()和asyncio.as_completed()返回的结果均是先完成的任务结果排在前面,所以此时打印出的结果不一定和原始顺序相同,但使用gather()的话可以得到原始顺序的结果集)
最后我们通过loop.close()关闭事件循环。
使用async和await实现协程 asyncio的使用可分三步走:
创建事件循环
指定循环模式并运行
关闭循环
简单来说就是:
将协程对象转为task任务对象
定义一个事件循环对象容器用来存放task
将task任务扔进事件循环对象中并触发
1.定义一个协程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import timeimport asyncioasync def do_some_work (x ): print ('Waiting: ' , x) start = time.time() coroutine = do_some_work(2 ) print (coroutine)loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print ('TIME: ' , time.time() - start)
通过async关键字定义一个协程(coroutine),协程也是一种对象。协程不能直接运行,需要把协程加入到事件循环(loop),事件循环在适当的时候调用协程。
asyncio.get_event_loop
方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。
2.创建一个task 在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象 。
==task对象是Future类的子类==。保存了协程运行后的状态,用于未来获取协程的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioimport timeasync def do_some_work (x ): print ('Waiting: ' , x) start = time.time() coroutine = do_some_work(2 ) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print (task)loop.run_until_complete(task) print (task)print ('TIME: ' , time.time() - start)
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task
run_until_complete的参数是一个futrue对象。 当传入一个协程,其内部会自动封装成task,==task是Future的子类==。isinstance(task, asyncio.Future)
将会输出True。
3.绑定回调 绑定回调,在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import timeimport asyncioasync def do_some_work (x ): print ('Waiting: ' , x) return 'Done after {}s' .format (x) def callback (future ): print ('Callback: ' , future.result()) start = time.time() coroutine = do_some_work(2 ) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print (task)task.add_done_callback(callback) loop.run_until_complete(task) print ('TIME: ' , time.time() - start)print (task)
如果回调需要多个参数,可以通过偏函数导入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import timeimport asyncioimport functoolsasync def do_some_work (x ): print ('Waiting: ' , x) return 'Done after {}s' .format (x) def callback (t, future ): print ('Callback:' , t, future.result()) start = time.time() coroutine = do_some_work(2 ) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print (task)task.add_done_callback(functools.partial(callback, 'hyl' )) loop.run_until_complete(task) print ('TIME: ' , time.time() - start)print (task)
coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。 我们创建的task和回调里的future对象,实际上是同一个对象。
4.future 与 result
前面不绑定回调的例子中,我们可以看到task有fiinished状态。
当task处于fiinished状态,我们可以直接使用task的result方法获取返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import timeimport asyncioasync def do_some_work (x ): print ('Waiting {}' .format (x)) return 'Done after {}s' .format (x) start = time.time() coroutine = do_some_work(2 ) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task) print ('Task ret: {}' .format (task.result()))print ('TIME: {}' .format (time.time() - start))
5.阻塞和await
使用async可以定义协程对象
使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。
协程遇到await,==事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行==。
我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioimport timeasync def do_some_work (x ): print ('Waiting: ' , x) await asyncio.sleep(x) return 'Done after {}s' .format (x) start = time.time() coroutine = do_some_work(2 ) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task) print ('Task ret: ' , task.result())print ('TIME: ' , time.time() - start)
在 asyncio.sleep的时候,使用await让出控制权。
即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。现在我们的例子就用耗时的阻塞操作了。
6.并发和并行 asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。创建多个协程的列表,然后将这些协程注册到事件循环中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import asyncioimport timeasync def do_some_work (x ): print ('Waiting: ' , x) await asyncio.sleep(x) return 'Done after {}s' .format (x) start = time.time() coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(4 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print ('--------' )for task in tasks: print ('Task ret: ' , task.result()) print ('TIME: ' , time.time() - start)
总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s。此时我们使用了aysncio实现了并发。
asyncio.wait(tasks)
也可以使用 asyncio.gather(*tasks)
,前者接受一个task列表,后者接收一堆task。
wait()官方文档用法如下:
done, pending = await asyncio.wait(aws)
7.协程嵌套 嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import asyncioimport timeasync def do_some_work (x ): print ('Waiting: ' , x) await asyncio.sleep(x) return 'Done after {}s' .format (x) async def main (): coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(4 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] dones, pendings = await asyncio.wait(tasks) for task in dones: print ('Task ret: ' , task.result()) if __name__ == '__main__' : start = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print ('TIME: ' , time.time() - start)
8.协程停止 future对象有几个状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import asyncioimport timeasync def do_some_work (x ): print ('Waiting: ' , x) await asyncio.sleep(x) return 'Done after {}s' .format (x) coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(2 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] start = time.time() loop = asyncio.get_event_loop() try : loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print (asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print (task.cancel()) loop.stop() loop.run_forever() finally : loop.close() print ('TIME: ' , time.time() - start)
启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。可以看到输出如下:
1 2 3 4 5 6 7 8 9 Waiting: 1 Waiting: 2 Waiting: 2 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>} True True True True TIME: 0.8858370780944824
True表示cannel成功
9.不同线程的事件循环
很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。
当前线程创建一个事件循环,然后再新建一个线程,在新线程中启动事件循环。当前线程不会被block。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import asyncioimport timefrom threading import Threaddef start_loop (loop ): asyncio.set_event_loop(loop) loop.run_forever() def more_work (x ): print ('More work {}' .format (x)) time.sleep(x) print ('Finished more work {}' .format (x)) start = time.time() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print ('TIME: {}' .format (time.time() - start))new_loop.call_soon_threadsafe(more_work, 6 ) new_loop.call_soon_threadsafe(more_work, 3 )
async with 和async for 用法说明 async with 异步上下文管理器指的是在enter和exit方法处能够暂停执行的上下文管理器 。
为了实现这样的功能,需要加入两个新的方法:__aenter__ 和__aexit__。这两个方法都要返回一个 awaitable类型的值。
1 2 3 4 5 6 class AsyncContextManager : async def __aenter__ (self ): await log('entering context' ) async def __aexit__ (self, exc_type, exc, tb ): await log('exiting context' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 async with EXPR as VAR: BLOCK mgr = (EXPR) aexit = type (mgr).__aexit__ aenter = type (mgr).__aenter__(mgr) exc = True VAR = await aenter try : BLOCK except : if not await aexit(mgr, *sys.exc_info()): raise else : await aexit(mgr, None , None , None )
async for
异步迭代器就是能够在next方法中调用异步代码 。
一个异步可迭代对象(asynchronous iterable)能够在迭代过程中调用异步代码
为了支持异步迭代:
一个对象必须实现__aiter__方法,该方法返回一个异步迭代器(asynchronous iterator)对象。
一个异步迭代器对象必须实现__anext__方法,该方法返回一个awaitable类型的值。
为了停止迭代,__anext__必须抛出一个StopAsyncIteration异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 class AsyncIterable : def __aiter__ (self ): return self async def __anext__ (self ): data = await self.fetch_data() if data: return data else : raise StopAsyncIteration async def fetch_data (self ): ...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async for TARGET in ITER: BLOCK else : BLOCK2 iter = (ITER)iter = type (iter ).__aiter__(iter )running = True while running: try : TARGET = await type (iter ).__anext__(iter ) except StopAsyncIteration: running = False else : BLOCK else : BLOCK2
补充 1.基础知识补充
loop = asyncio.get_event_loop()
: 每个线程中只能有一个事件循环,get_event_loop 方法会获取当前已经存在的事件循环,如果当前线程中没有,就会新建一个
loop.run_until_complete(coroutine)
:
将协程对象注入到事件循环,协程的运行由事件循环控制。
事件循环的 run_until_complete 方法会阻塞运行,直到任务全部完成。
协程对象作为 run_until_complete 方法的参数,loop 会自动将协程对象包装成任务来运行。
任务对象保存了协程运行后的状态,用于未来获取协程的结果。
为什么要使用协程对象创建任务? 因为在这个过程中 asyncio.Task 做了一些工作,包括预激协程、协程运行中遇到某些异常时的处理
协程对象不能直接运行,必须放入事件循环中或者由 yield from 语句调用 。
回调函数的最后一个参数是 future 或 task 对象 ,通过该对象可以获取协程返回值。如果回调需要多个参数,可以通过偏函数导入。
1 2 3 4 5 6 def callback (name, task ): print ('[callback] Hello {}' .format (name)) print ('[callback] coroutine state: {}' .format (task._state)) task = loop.create_task(coroutine) task.add_done_callback(functools.partial(callback, 'hyl' ))
其实多数情况下无需调用 task 的 add_done_callback 方法,可以直接把回调函数中的代码写入 await 语句后面
asyncio.sleep 与 time.sleep 是不同的
实际项目中,往往有多个协程创建多个任务对象,同时在一个 loop 里运行。 为了把多个协程交给 loop,需要借助 asyncio.gather 方法。
asyncio.gather 方法中参数的顺序决定了协程的运行顺序 (asyncio.wait也是按顺序执行)
使用 asyncio.gather 方法,任务的 result 方法可以获得对应的协程函数的 return 值。
多数情况下同样无需调用 task 的 result 方法获取协程函数的 return 值,因为事件循环的 run_until_complete 方法的返回值就是协程函数的 return 值
2.全流程解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import timeimport asynciodef four (): start = time.time() async def corowork (name, t ): print ('[corowork] Start coroutine' , name) await asyncio.sleep(t) print ('[corowork] Stop coroutine' , name) return 'Coroutine {} OK' .format (name) loop = asyncio.get_event_loop() coroutine1 = corowork('ONE' , 3 ) coroutine2 = corowork('TWO' , 1 ) task1 = loop.create_task(coroutine1) task2 = loop.create_task(coroutine2) gather = asyncio.gather(task1, task2) loop.run_until_complete(gather) print ('[task1] ' , task1.result()) print ('[task2] ' , task2.result()) end = time.time() print ('运行耗时:{:.4f}' .format (end - start)) four()
首先运行 task1
打印 [corowork] Start coroutine ONE
遇到 asyncio.sleep 阻塞
释放 CPU 转到 task2 中执行
打印 [corowork] Start coroutine TWO
再次遇到 asyncio.sleep 阻塞
这次没有其它协程可以运行了,只能等阻塞结束
task2 的阻塞时间较短,阻塞 1 秒后先结束,打印 [corowork] Stop coroutine TWO
又过了 2 秒,阻塞 3 秒的 task1 也结束了阻塞,打印 [corowork] Stop coroutine ONE
至此两个任务全部完成,事件循环停止
打印两个任务的 result
打印程序运行时间
程序全部结束
3.是否要关闭事件循环 事件循环有一个 stop 方法用来停止循环和一个 close 方法用来关闭循环。以上示例中都没有调用 loop.close 方法,似乎并没有什么问题。所以到底要不要调用 loop.close 呢?
简单来说,loop 只要不关闭,就还可以再次运行 run_until_complete 方法,关闭后则不可运行。有人会建议调用 loop.close,彻底清理 loop 对象防止误用,其实多数情况下根本没有这个必要。
4.asyncio.gather 和 asyncio.wait 的异同
5.取消任务的两种方法
loop.stop() : 事件循环的 stop 方法取消所有任务 ,停止事件循环
任务的 cancel 方法也可以取消任务,而 asyncio.Task.all_tasks 方法可以获得事件循环中的全部任务
loop.stop PENDING 状态的任务才能被取消,FINISHED 状态的任务已经完成,不能取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import asyncioasync def work (id , t ): print ('Wroking...' ) await asyncio.sleep(t) print ('Work {} done' .format (id )) def main (): loop = asyncio.get_event_loop() coroutines = [work(i, i) for i in range (1 , 4 )] try : loop.run_until_complete(asyncio.gather(*coroutines)) except KeyboardInterrupt: loop.stop() finally : loop.close() if __name__ == '__main__' : main()
首先,id 为 1 的协程先启动运行
打印 Working…
遇到 IO 阻塞,释放 CPU ,CPU 去到 id 为 2 的协程中运行
同样首先打印 Working…
遇到 IO 阻塞,同样释放 CPU ,第三个协程开始运行,打印 Working…
以上步骤瞬间完成,这时候的 loop 中全部协程处于阻塞状态
一秒钟后,id 为 1 的协程结束阻塞
打印 Work 1 done
然后手动按下快捷键 Ctrl + C ,触发 KeyboardInterrupt 异常
try except 语句捕获异常,执行 # 3 和 # 4
程序运行完毕
task.cancel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioasync def work (id , t ): print ('Wroking...' ) await asyncio.sleep(t) print ('Work {} done' .format (id )) def main (): loop = asyncio.get_event_loop() coroutines = [work(i, i) for i in range (1 , 4 )] try : loop.run_until_complete(asyncio.gather(*coroutines)) except KeyboardInterrupt: print () tasks = asyncio.Task.all_tasks() for i in tasks: print ('取消任务:{}' .format (i)) print ('取消状态:{}' .format (i.cancel())) finally : loop.close() main()
6.loop.run_forever 无限循环 排定任务 排定 task / future 在事件循环中的执行顺序,也就是对应的协程先执行哪个,遇到 IO 阻塞时,CPU 转而运行哪个任务,这是我们在进行异步编程时的一个需求。
前文所示的多任务程序中,事件循环里的任务的执行顺序由 asyncio.ensure_future / loop.create_task 和 asyncio.gather 排定。
run_forever 方法为无限运行事件循环,需要自定义 loop.stop 方法并执行之才会停止。
有两种执行loop.stop的方法:
单任务事件循环,将 loop 作为参数传入协程函数创建协程,在协程内部执行 loop.stop 方法停止事件循环。
多任务事件循环,使用回调函数执行 loop.stop 停止事件循环
在协程内部执行loop.stop方法停止事件循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioasync def work (loop, t ): print ('start' ) await asyncio.sleep(t) print ('after {}s stop' .format (t)) loop.stop() loop = asyncio.get_event_loop() task = asyncio.ensure_future(work(loop, 1 )) loop.run_forever() loop.close()
使用回调函数执行 loop.stop 停止事件循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import timeimport asyncioimport functoolsdef loop_stop (loop, future ): loop.stop() async def work (t ): print ('start' ) await asyncio.sleep(t) print ('after {}s stop' .format (t)) def main (): loop = asyncio.get_event_loop() tasks = asyncio.gather(work(1 ), work(2 )) tasks.add_done_callback(functools.partial(loop_stop, loop)) loop.run_forever() loop.close() if __name__ == '__main__' : start = time.time() main() end = time.time() print ('耗时:{:.4f}s' .format (end - start))
7.将普通函数作为任务注入事件循环的三种方法
loop.call_soon
loop.call_later
loop.call_at & loop.time
这三个 call_xxx 方法的作用都是将普通函数 作为任务排定到事件循环中,返回值都是 asyncio.events.TimerHandle 实例
注意:它们不是协程任务 ,不能作为 loop.run_until_complete 的参数。
loop.call_soon 事件循环的 call_soon 方法可以将普通函数作为任务加入到事件循环并立即排定任务的执行顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asyncioimport timedef hello (name ): print ('[hello] Hello, {}' .format (name)) async def work (t, name ): print ('[work ] start' , name) await asyncio.sleep(t) print ('[work ] {} after {}s stop' .format (name, t)) def main (): loop = asyncio.get_event_loop() asyncio.ensure_future(work(1 , 'A' )) loop.call_soon(hello, 'Tom' ) loop.create_task(work(2 , 'B' )) loop.run_until_complete(work(3 , 'C' )) if __name__ == '__main__' : main()
loop.call_later 此方法同 loop.call_soon 一样,可将普通函数作为任务放到事件循环里,不同之处在于此方法可延时执行,第一个参数为延时时间 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import asyncioimport functoolsdef hello (name ): print ('[hello] Hello, {}' .format (name)) async def work (t, name ): print ('[work{}] start' .format (name)) await asyncio.sleep(t) print ('[work{}] stop' .format (name)) def main (): loop = asyncio.get_event_loop() asyncio.ensure_future(work(1 , 'A' )) loop.call_later(1.2 , hello, 'Tom' ) loop.call_soon(hello, 'Kitty' ) task4 = loop.create_task(work(2 , 'B' )) loop.call_later(1 , hello, 'Jerry' ) loop.run_until_complete(task4) if __name__ == '__main__' : main()
毫无疑问,这五个任务在一个事件循环里是顺序执行,遇到阻塞执行下一个,程序执行顺序如下:
首先执行任务一,打印一行后阻塞 1 秒,执行任务二
任务二是 call_later 1.2 秒,就相当于一个 1.2 秒的 asyncio.sleep
==注意,call_later 这个延时 1.2 秒是事件循环启动时就开始计时的 (所以任务二阻塞,执行任务三)==
因为任务二阻塞,执行任务三,
接着执行任务四,打印一行后阻塞 2 秒
接着执行任务五,因为 call_later 1 秒,发生阻塞,第一轮结束.
以上是五个任务第一轮的执行情况, 第二轮开始前,此时所有的任务(任务1,任务2,任务4.任务5)都在阻塞中,所以CPU 一直候着
第一个发出执行信号的是任务五,它只阻塞 1 秒 (上面已经说了,这个 1 秒是从事件循环启动时开始算,所以这个阻塞肯定比任务一的阻塞 1 秒先结束)
CPU 执行完任务五,任务一也阻塞结束了,执行任务一
然后是任务二,最后是任务四
第二轮打印了 4 行,全部任务完成,停止事件循环
总结 :
第一轮 : 任务1 - 任务3 - 任务4
第二轮 : 任务5 - 任务1 - 任务2 - 任务4
我们可以总结事件循环的执行顺序:
首先按顺序执行各个任务,一旦任务阻塞,就跳转到下一个任务,直到第一轮结束
接下来,看哪个任务准备好了,就执行哪个任务 (如果都准备好了就执行前面的任务)
loop.call_at & loop.time
call_soon 立刻执行
call_later 延时执行
call_at 在某时刻执行
loop.time 就是事件循环内部的一个计时方法,返回值是时刻,数据类型是 float
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioimport functoolsdef hello (name ): print ('[hello] Hello, {}' .format (name)) async def work (t, name ): print ('[work{}] start' .format (name)) await asyncio.sleep(t) print ('[work{}] stop' .format (name)) def main (): loop = asyncio.get_event_loop() start = loop.time() asyncio.ensure_future(work(1 , 'A' )) loop.call_at(start+1.2 , hello, 'Tom' ) loop.call_soon(hello, 'Kitty' ) task4 = loop.create_task(work(2 , 'B' )) loop.call_at(start+1 , hello, 'Jerry' ) loop.run_until_complete(task4) main()
8.协程的通讯方法 asyncio.lock 协程锁: asyncio.lock 应该叫做异步 IO 锁,之所以叫协程锁,是因为它通常使用在子协程中,其作用是将协程内部的一段代码锁住,直到这段代码运行完毕解锁。
协程锁的固定用法是使用 async with 创建协程锁的上下文环境 ,将代码块写入其中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import asynciol = [] lock = asyncio.Lock() async def work (name ): print (r'\\\\\\\\\\\\' ,name) async with lock: print ('{} start' .format (name)) if 'x' in l: return name print ('++++++++++' ,name) await asyncio.sleep(0 ) print ('----------' ,name) l.append('x' ) print ('{} end' .format (name)) return name async def one (): name = await work('one' ) print ('{} ok' .format (name)) async def two (): print ('///////////' ) name = await work('two' ) print ('{} ok' .format (name)) def main (): loop = asyncio.get_event_loop() tasks = asyncio.wait([one(), two()]) loop.run_until_complete(tasks) if __name__ == '__main__' : main()
执行步骤解析:
协程one当执行到++++++++++ one
后执行await asyncio.sleep(0)
,此时lock并没有解开,就切换协程two了
协程two开始执行,打印///////////
协程two执行完\\\\\\\\\\\\ two
,后执行async with lock
,但是此时lock并没有解开.切换到协程one
协程one恢复执行---------- one
.当执行完return name
后lock解开.
协程one执行完one ok
,协程one执行完毕,开始执行协程two
执行协程two
示例二
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import asyncioimport functoolsdef unlock (lock ): print ('callback releasing lock' ) lock.release() async def coro1 (lock ): print ('coro1 wating for the lock' ) with await lock: print ('coro1 acquired lock' ) print ('coro1 released lock' ) async def coro2 (lock ): print ('coro2 wating for the lock' ) await lock try : print ('coro2 acquired lock' ) finally : print ('coro2 released lock' ) lock.release() async def main (loop ): lock = asyncio.Lock() print ('acquiring the lock before starting coroutines' ) await lock.acquire() print ('lock acquired: {}' .format (lock.locked())) loop.call_later(0.1 , functools.partial(unlock, lock)) print ('waiting for coroutines' ) await asyncio.wait([coro1(lock), coro2(lock)]) event_loop = asyncio.get_event_loop() try : event_loop.run_until_complete(main(event_loop)) finally : event_loop.close()
asyncio.Event asyncio.Event
类似 threading.Event
用来允许多个消费者等待某个事情发生,不用通过监听一个特殊的值的来说实现类似通知 的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import asyncioimport functoolsdef set_event (event ): print ('setting event in callback' ) event.set () async def coro1 (event ): print ('coro1 waiting for event' ) await event.wait() print ('coro1 triggered' ) async def coro2 (event ): print ('coro2 waiting for event' ) await event.wait() print ('coro2 triggered' ) async def main (loop ): event = asyncio.Event() print ('event start state: {}' .format (event.is_set())) loop.call_later( 0.1 , functools.partial(set_event, event) ) await asyncio.wait([coro1(event), coro2(event)]) print ('event end state: {}' .format (event.is_set())) event_loop = asyncio.get_event_loop() try : event_loop.run_until_complete(main(event_loop)) finally : event_loop.close()
和 Lock
一样,coro1()
和 coro2()
都在等待 event 被设置。
不同的是它们都在 event 状态一发生变化的时候就启动了,它们不需要对 event 对象获取一个唯一的所有权。
简单来说 : event 能唤醒所有等待中的 coroutine
asyncio.condition Condition
的效果类似 Event
,不同的是它不是唤醒所有等待中的 coroutine, 而是通过 notify()
唤醒指定数量的待唤醒 coroutine。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import asyncioasync def consumer (condition, n ): with await condition: print ('consumer {} is waiting' .format (n)) await condition.wait() print ('consumer {} triggered' .format (n)) print ('ending consumer {}' .format (n)) async def manipulate_condition (condition ): print ('starting manipulate_condition' ) await asyncio.sleep(0.1 ) for i in range (1 , 3 ): with await condition: print ('notifying {} consumers' .format (i)) condition.notify(n=i) await asyncio.sleep(0.1 ) with await condition: print ('notifying remaining consumers' ) condition.notify_all() print ('ending manipulate_condition' ) async def main (loop ): condition = asyncio.Condition() consumers = [ consumer(condition, i) for i in range (5 ) ] loop.create_task(manipulate_condition(condition)) await asyncio.wait(consumers) event_loop = asyncio.get_event_loop() try : result = event_loop.run_until_complete(main(event_loop)) finally : event_loop.close()
启动了五个 Condition 的消费者,每个都使用 wait()
方法来等待它们可以继续处理的通知。
manipulate_condition()
首先通知了一个消费者,然后又通知了两个消费者,最后通知剩下的所有消费者。
在线程中使用condition需要注意: 必须每个线程先获取conditioncond.acquire()
,最后必须释放cond.release()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import threading, timeclass Hider (threading.Thread ): def __init__ (self, cond, name ): super (Hider, self).__init__() self.cond = cond self.name = name def run (self ): time.sleep(1 ) self.cond.acquire() print (self.name + ': 我已经把眼睛蒙上了' ) self.cond.notify() self.cond.wait() print (self.name + ': 我找到你了哦 ~_~' ) self.cond.notify() self.cond.release() print (self.name + ': 我赢了' ) class Seeker (threading.Thread ): def __init__ (self, cond, name ): super (Seeker, self).__init__() self.cond = cond self.name = name def run (self ): self.cond.acquire() self.cond.wait() print (self.name + ': 我已经藏好了,你快来找我吧' ) self.cond.notify() self.cond.wait() self.cond.release() print (self.name + ': 被你找到了,哎~~~' ) cond = threading.Condition() seeker = Seeker(cond, 'seeker' ) hider = Hider(cond, 'hider' ) seeker.start() hider.start()
在协程里也一样,每个协程函数都必须先获取condition,最后释放 .这里我们使用了with自动获取和释放
1 2 3 with await condition: print ('consumer {} is waiting' .format (n)) await condition.wait()
注意: 为了更明显设置了新特性:with await
要改为async with
:
1 2 3 4 5 with await condition: print ('consumer {} is waiting' .format (n)) async with condition: print ('consumer {} is waiting' .format (n))
asyncio.Queue asyncio.Queue
为 coroutines 实现了一个先进先出的数据结构,类似多线程中的 queue.Queue
,多进程中的 multiprocessing.Queue
queue.Queue
的queue.task_done()的功能 : 每task_done一次 就从队列里删掉一个元素,这样在最后queue.join()的时候根据队列长度是否为零来判断队列是否结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 import asyncioasync def consumer (n, q ): print ('consumer {}: waiting for item' .format (n)) while True : print ('consumer {}: waiting for item' .format (n)) item = await q.get() print ('consumer {}: has item {}' .format (n, item)) if item is None : q.task_done() break else : await asyncio.sleep(0.01 * item) q.task_done() print ('consumer {}: ending' .format (n)) async def producer (q, num_workers ): print ('producer: starting' ) for i in range (num_workers * 3 ): await q.put(i) print ('producer: added task {} to the queue' .format (i)) print ('producer: adding stop signals to the queue' ) for i in range (num_workers): await q.put(None ) print ('producer: waiting for queue to empty' ) await q.join() print ('producer: ending' ) async def main (loop, num_consumers ): q = asyncio.Queue(maxsize=num_consumers) consumers = [ loop.create_task(consumer(i, q)) for i in range (num_consumers) ] prod = loop.create_task(producer(q, num_consumers)) await asyncio.wait(consumers + [prod]) event_loop = asyncio.get_event_loop() try : event_loop.run_until_complete(main(event_loop, 2 )) finally : event_loop.close()
9.获取协程返回值的方法 一个协程中await了另外一个协程 1 2 3 4 5 6 7 8 9 10 tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] dones, pendings = await asyncio.wait(tasks) for task in dones: print ('Task ret: ' , task.result())
asyncio.gather创建协程对象
如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。
1 2 3 4 5 6 7 8 9 10 tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] results = await asyncio.gather(*tasks) for result in results: print ('Task ret: ' , result)
run_until_complete返回协程的结果 在函数里return await asyncio.gather(*tasks)
后就可以使用results = loop.run_until_complete(main())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async def main (): coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(2 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.gather(*tasks) loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) for result in results: print ('Task ret: ' , result)
也可以使用return await asyncio.wait(tasks)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def main (): coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(4 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.wait(tasks) start = now() loop = asyncio.get_event_loop() done, pending = loop.run_until_complete(main()) for task in done: print ('Task ret: ' , task.result())
asyncio的as_completed方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 async def main (): coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(4 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks): result = await task loop = asyncio.get_event_loop() done = loop.run_until_complete(main())
as_completed
最常使用的形式:
1 2 for task in asyncio.as_completed(tasks): result = await task
注意result = await task
中有一个await
10.asyncio的futrue对象
Future
对象表示一个还未完成的工作,事件循环可以监视 Future
对象的状态直至它变成 done ,这将运行程序的一部分等待另一部分完成一些工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import asynciodef mark_done (future, result ): print ('setting future result to {!r}' .format (result)) future.set_result(result) event_loop = asyncio.get_event_loop() all_done = asyncio.Future() try : print ('scheduling make_done' ) event_loop.call_soon(mark_done, all_done, 'the result' ) print ('entering event loop' ) result = event_loop.run_until_complete(all_done) print ('returned result: {!r}' .format (result)) finally : print ('closing event loop' ) event_loop.close() print ('future result: {!r}' .format (all_done.result()))
注意:event_loop.run_until_complete(all_done)
的意思是将all_done放入事件循环,然后开始时间循环的意思.(也就是说,这个事件循环可能存在已经注册了的task ) 简单来说就是 : ==启动事件循环,顺便再添加一个任务==
1 2 3 4 5 6 7 8 9 10 11 def main (): loop = asyncio.get_event_loop() asyncio.ensure_future(work(1 , 'A' )) loop.call_soon(hello, 'Tom' ) loop.create_task(work(2 , 'B' )) loop.run_until_complete(work(3 , 'C' ))
当调用 set_result
方法后,Future
对象的状态会被修改为 done, 同时 Future
实例也会保存设置的结果值,供随后使用
11.使用抽象类 Protocol 实现异步 I/O 服务端
EchoServer继承 asyncio.Protocol
,用来处理与客户端的通信。
protocol
的方法是基于服务端 socket 事件来触发的。
每当有一个新的客户端连接的时候,就会触发调用 connection_made()
方法。
1 2 3 4 5 6 7 def connection_made (self, transport ): self.transport = transport self.address = transport.get_extra_info('peername' ) self.log = logging.getLogger( 'EchoServer_{}_{}' .format (*self.address) ) self.log.debug('connection accepted' )
transport
参数是一个 asyncio.Transport
实例对象,这个对象抽象了一系列使用 socket 进行异步 I/O 操作的方法。
不同的通信协议提供了不同的 transport 实现,但是它们都有同样的 API.
(比如,有一些 transport 类用来与 socket 通信,有些用来跟子进程通过管道通信)
可以通过 get_extra_info()
获取进来的客户端的地址信息。
连接建立以后,当有数据从客户端发到服务端的时候会使用传输过来的数据调用 data_received()
方法。 这里我们记录一下收到的数据,然后立即发收到的数据通过 transport.write()
发回客户端。
1 2 3 4 def data_received (self, data ): self.log.debug('received {!r}' .format (data)) self.transport.write(data) self.log.debug('sent {!r}' .format (data))
一些 transport 支持一个特殊的 end-of-file 标识符(EOF
)。当遇到一个 EOF
的时候,eof_received()
方法会被调用。 在本次实现中,EOF
会被发送会客户端,表示这个信号已经被收到。因为不是所有的 transport 都支持这个 EOF
,这个协议会首先询问 transport 是否可以安全的发送 EOF
.
1 2 3 4 def eof_received (self ): self.log.debug('received EOF' ) if self.transport.can_write_eof(): self.transport.write_eof()
当一个连接被关闭的时候,无论是正常关闭还是因为一个错误导致的关闭,协议的 connection_lost()
方法都会被调用,如果是因为出错,参数中会包含一个相关的异常对象,否则这个对象就是 None
.
1 2 3 4 5 6 def connection_lost (self, error ): if error: self.log.error('ERROR: {}' .format (error)) else : self.log.debug('closing' ) super ().connection_lost(error)
需要两步来启动这个服务器。
首先,应用告诉事件循环创建使用 protocol 类和 hostname 以及 socket 监听的端口信息来创建一个新的 server 对象。
create_server()
方法是一个 coroutine, 所以它的结果必须通过事件循环来处理这样才能真正的启动服务器。
这个 coroutine 完成的时候会返回一个 与事件循环相关联的 asyncio.Server
实例.
1 2 3 factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS) server = event_loop.run_until_complete(factory) log.debug('starting up on {} port {}' .format (*SERVER_ADDRESS))
然后这个事件循环需要被运行,以便接收客户端请求以及处理相关事件。
对于一个长时间运行的服务器程序来说, run_forever()
方法是最简便的实现这个功能的方法。
当事件循环被停止的时候,无论是通过应用程序代码还是通过进程信号停止的,server 都可以被关闭以便能够正确的清理 socket 资源
1 2 3 4 5 6 7 8 try : event_loop.run_forever() finally : log.debug('closing server' ) server.close() event_loop.run_until_complete(server.wait_closed()) log.debug('closing event loop' ) event_loop.close()
客户端
使用 protocol 类实现一个客户端的代码跟实现一个服务器端非常的相似.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncioimport functoolsimport loggingimport sysMESSAGES = [ b'This is the message. ' , b'It will be sent ' , b'in parts.' , ] SERVER_ADDRESS = ('localhost' , 10000 ) logging.basicConfig( level=logging.DEBUG, format ='%(name)s: %(message)s' , stream=sys.stderr, ) log = logging.getLogger('main' ) event_loop = asyncio.get_event_loop()
客户端 protocol 类定义了跟服务器端相同的方法,但是是不同的实现。
future
参数是一个 Future
实例,用来作为客户端已经完成了一次接收来自服务端数据操作 的信号。
1 2 3 4 5 6 class EchoClient (asyncio.Protocol ): def __init__ (self, messages, future ): super ().__init__() self.messages = messages self.log = logging.getLogger('EchoClient' ) self.f = future
当客户端成功连接到服务器时,会立即开始通信。客户端一次发送了一堆数据,因为网络等原因可能会把多个消息合并到一个消息中。当所有消息都送达的时候,将发送一个 EOF
。
虽然看起你所有的数据都立即被发送了,事实上 transport 对象会缓冲发出去的数据并且会设置一个回调来传输最终的数据,当 socket 的缓冲区准备好可以发送的时候会调用这个回调。这些都是由 transport 来实现的,所以应用代码可以按照 I/O 操作就像看起来那么发生的样子来实现.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def connection_made (self, transport ): self.transport = transport self.address = transport.get_extra_info('peername' ) self.log.debug( 'connectiong to {} port {}' .format (*self.address) ) for msg in self.messages: transport.write(msg) self.log.debug('sending {!r}' .format (msg)) if transport.can_write_eof(): transport.write_eof()
当接收到来着服务器端的响应时,将会把这个响应记录下来
1 2 def data_received (self, data ): self.log.debug('received {!r}' .format (data))
无论是收到 end-of-file 标记还是服务器端断开了连接,本地 transport 对象都将关闭并且 future 对象都会被通过设置一个结果值的方式标记为已完成。
1 2 3 4 5 6 7 8 9 10 11 12 def eof_received (self ): self.log.debug('received EOF' ) self.transport.close() if not self.f.done(): self.f.set_result(True ) def connnection_lost (self, exc ): self.log.debug('server closed connection' ) self.transport.close() if not self.f.done(): self.f.set_result(True ) super ().connectiong_lost(exc)
然后创建所需的 future, 以及客户端 coroutine
1 2 3 4 5 6 7 8 9 10 client_completed = asyncio.Future() client_factory = functools.partial( EchoClient, messages=MESSAGES, future=client_completed ) factory_coroutine = event_loop.create_connection( client_factory, *SERVER_ADDRESS, )
然后使用两次 wait 来处理客户端发送完成并退出的操作
1 2 3 4 5 6 7 og.debug('waiting for client to complete' ) try : event_loop.run_until_complete(factory_coroutine) event_loop.run_until_complete(client_completed) finally : log.debug('closing event loop' ) event_loop.close()
完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import asyncioimport loggingimport sysSERVER_ADDRESS = ('localhost' , 10000 ) logging.basicConfig( level=logging.DEBUG, format ='%(name)s: %(message)s' , stream=sys.stderr, ) log = logging.getLogger('main' ) event_loop = asyncio.get_event_loop() class EchoServer (asyncio.Protocol ): def connection_made (self, transport ): self.transport = transport self.address = transport.get_extra_info('peername' ) self.log = logging.getLogger( 'EchoServer_{}_{}' .format (*self.address) ) self.log.debug('connection accepted' ) def data_received (self, data ): self.log.debug('received {!r}' .format (data)) self.transport.write(data) self.log.debug('sent {!r}' .format (data)) def eof_received (self ): self.log.debug('received EOF' ) if self.transport.can_write_eof(): self.transport.write_eof() def connection_lost (self, error ): if error: self.log.error('ERROR: {}' .format (error)) else : self.log.debug('closing' ) super ().connection_lost(error) factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS) server = event_loop.run_until_complete(factory) log.debug('starting up on {} port {}' .format (*SERVER_ADDRESS)) try : event_loop.run_forever() finally : log.debug('closing server' ) server.close() event_loop.run_until_complete(server.wait_closed()) log.debug('closing event loop' ) event_loop.close()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 import asyncioimport functoolsimport loggingimport sysMESSAGES = [ b'This is the message. ' , b'It will be sent ' , b'in parts.' , ] SERVER_ADDRESS = ('localhost' , 10000 ) logging.basicConfig( level=logging.DEBUG, format ='%(name)s: %(message)s' , stream=sys.stderr, ) log = logging.getLogger('main' ) event_loop = asyncio.get_event_loop() class EchoClient (asyncio.Protocol ): def __init__ (self, messages, future ): super ().__init__() self.messages = messages self.log = logging.getLogger('EchoClient' ) self.f = future def connection_made (self, transport ): self.transport = transport self.address = transport.get_extra_info('peername' ) self.log.debug( 'connectiong to {} port {}' .format (*self.address) ) for msg in self.messages: transport.write(msg) self.log.debug('sending {!r}' .format (msg)) if transport.can_write_eof(): transport.write_eof() def data_received (self, data ): self.log.debug('received {!r}' .format (data)) def eof_received (self ): self.log.debug('received EOF' ) self.transport.close() if not self.f.done(): self.f.set_result(True ) def connnection_lost (self, exc ): self.log.debug('server closed connection' ) self.transport.close() if not self.f.done(): self.f.set_result(True ) super ().connectiong_lost(exc) client_completed = asyncio.Future() client_factory = functools.partial( EchoClient, messages=MESSAGES, future=client_completed ) factory_coroutine = event_loop.create_connection( client_factory, *SERVER_ADDRESS, ) log.debug('waiting for client to complete' ) try : event_loop.run_until_complete(factory_coroutine) event_loop.run_until_complete(client_completed) finally : log.debug('closing event loop' ) event_loop.close()