进程与线程 使用多线程 继承Thread类 1 2 3 4 5 6 7 8 9 10 11 12 13 import timefrom threading import Threadclass thread1 (Thread ): def run (self ): while True : print ('------------' ) time.sleep(1 ) for each in range (5 ): thread1().start()
直接使用Thread类 1 2 3 4 5 6 7 8 9 10 11 12 13 import timefrom threading import Threaddef func (num ): while True : print (num) num += 1 time.sleep(1 ) t = Thread(target=func,args=(1 ,)) t.start()
使用ThreadPoolExecutor 1 2 3 4 5 6 7 8 9 10 11 12 13 import timeimport threadingfrom concurrent.futures import ThreadPoolExecutordef func (num ): print ('---' ,num) time.sleep(1 ) if __name__ == '__main__' : with ThreadPoolExecutor(3 ) as e: e.map (func,range (10 ))
使用多进程 使用Process类 1 2 3 4 5 6 7 8 9 10 import timefrom multiprocessing import Processdef func (num ): for n in range (num): print (n) time.sleep(1 ) for each in range (10 ): Process(target=func,args=(each,)).start()
使用Pool
注意 : 必须将它放在if __name__ == '__main__':
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import timefrom multiprocessing import Pooldef func (num ): for n in range (num): print (n) time.sleep(1 ) if __name__ == '__main__' : p = Pool(10 ) for n in range (10 ): p.apply_async(func,args=(n,)) p.close() p.join()
使用Queue
注意同样要使用if __name__ == '__main__':
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import timefrom multiprocessing import Process,Queuedef read (q ): while True : res = q.get() print (res) time.sleep(1 ) def write (q ): num = 0 while True : num += 1 q.put(num) time.sleep(1 ) if __name__ == '__main__' : q = Queue() r = Process(target=read,args=(q,)) w = Process(target=write,args=(q,)) w.start() r.start() time.sleep(5 ) r.terminate() w.terminate() print ('强制结束' )
线程限制 编写一个资源抢占的多线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import timefrom threading import Threaddef func (): global num for _ in range (1000000 ): num += 1 if __name__ == '__main__' : num = 0 t1 = Thread(target=func) t2 = Thread(target=func) t1.start() t2.start() t1.join() t2.join() print (num)
使用Lock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import timefrom threading import Thread,Lockdef run1 (): global num for _ in range (1000000 ): with lock: num += 1 if __name__ == '__main__' : num = 0 lock = Lock() t1 = Thread(target=run1) t2 = Thread(target=run1) t1.start() t2.start() t1.join() t2.join() print (num)
使用RLock 可重入锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import timefrom threading import Thread,RLockdef fun (): global num for _ in range (10000 ): with lock: num += 1 with lock: print (num) if __name__ == '__main__' : lock = RLock() num = 0 Thread(target=fun).start()
使用ThreadLocal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from threading import Thread,localdef fun (num ): local.num = num for _ in range (1000000 ): local.num += 1 print ('-----' ,local.num) if __name__ == '__main__' : local = local() num = 0 t1 = Thread(target=fun,args=(num,)) t2 = Thread(target=fun,args=(num,)) t1.start() t2.start() t1.join() t2.join() print ('-----' ,num)
使用Semaphore 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import timefrom threading import Thread,Semaphoredef fun (num ): while True : num += 1 with sem: print ('--------' ,num) time.sleep(2 ) if __name__ == '__main__' : sem = Semaphore(3 ) num = 0 for i in range (9 ): Thread(target=fun,args=(i,)).start() time.sleep(15 ) print ('*******' ,num)
使用Barrier 凑够一定数量的线程才能执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import timefrom threading import Thread,Barrierdef fun (num ): bar.wait() print ('--------' ,num) time.sleep(2 ) if __name__ == '__main__' : bar = Barrier(3 ) num = 0 for i in range (9 ): time.sleep(1 ) Thread(target=fun,args=(i,)).start() time.sleep(15 ) print ('*******' ,num)
使用Timer 定时线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import timefrom threading import Thread,Timerdef fun (): print ('--------' ) time.sleep(2 ) if __name__ == '__main__' : t = Timer(5 ,fun) t.start() t.join() print ('*******' )
线程通讯 使用Event 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import timefrom threading import Thread,Eventdef fun (): while True : event.wait() print ('--------' ) event.clear() if __name__ == '__main__' : event = Event() t = Thread(target=fun) t.start() for _ in range (5 ): time.sleep(1 ) event.set () print ('********' )
wait – set – clear
对应着阻塞
– 解除阻塞
– 重新阻塞
整个流程
使用Condition 用于线程之间互相唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import timefrom threading import Thread,Conditiondef fun1 (): while True : with cond: cond.wait() print ('--------' ) time.sleep(1 ) cond.notify() def fun2 (): while True : with cond: cond.notify() print ('*******' ) time.sleep(1 ) cond.wait() if __name__ == '__main__' : cond = Condition() t1 = Thread(target=fun1) t2 = Thread(target=fun2) t1.start() t2.start()
多任务的最佳实践 设计Master-Worker
模式,Master负责分配任务,Worker负责执行任务
主进程就是Master,其他进程就是Worker
优点:稳定性高: 一个子进程崩溃了,不会影响主进程和其他子进程,当然主进程挂掉了,所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低
缺点:
创建进程的代价大: 在Unix/Linux系统下,用fork调用还行,在Window创建进程开销巨大。
操作系统能同时运行的进程数有限: 在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题
协程
不能在生成器中使用await
不能在 async 定义的协程中使用yield
协程对象不能直接运行,必须放入事件循环中或者由 yield from 语句调用 。
yield from的两大优势
避免嵌套循环
转移控制权
asyncio的组件
event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
coroutine 协程: 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务: 一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装 ,其中包含任务的各种状态。
future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别 (task对象是Future类的子类 )
async/await 关键字: python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
传入loop的方法 直接传入协程 1 2 3 4 5 async def read ():pass async def write ():pass loop = asyncio.get_event_loop() loop.run_until_complete(write())
使用asyncio.wait传入协程列表 1 2 3 4 5 6 7 8 async def coro1 ():pass async def coro2 ():pass loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([ coro1(), coro2(), ]))
使用asyncio.gather传入协程 1 2 3 4 5 6 group1 = asyncio.gather(*[func("A" ,i) for i in range (1 , 3 )]) group2 = asyncio.gather(*[func("B" , i) for i in range (1 , 5 )]) group3 = asyncio.gather(*[func("B" , i) for i in range (1 , 7 )]) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(group1, group2, group3))
task 创建task的两种方法 使用create_task
1 2 async def do_some_work (x ):pass task = loop.create_task(do_some_work(2 ))
使用ensure_future
1 2 async def do_some_work (x ):pass task = asyncio.ensure_future(coroutine)
task绑定回调 1 2 3 4 5 async def coroutine (x ):pass def callback (future ):pass task = asyncio.ensure_future(coroutine) task.add_done_callback(callback)
task获取结果 使用task.result() 1 2 3 4 async def coroutine (x ):pass task = asyncio.ensure_future(coroutine) loop.run_until_complete(task) print (task.result())
使用asyncio.wait(tasks) 1 2 3 4 5 6 7 8 9 10 tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] dones, pendings = await asyncio.wait(tasks) for task in dones: print ('Task ret: ' , task.result())
asyncio.get_event_looloop.run_until_completep配合asyncio.gather 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async def main (): coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(2 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.gather(*tasks) loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) for result in results: print ('Task ret: ' , result)
loop.run_until_complete配合asyncio.wait 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def main (): coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(4 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.wait(tasks) start = now() loop = asyncio.get_event_loop() done, pending = loop.run_until_complete(main()) for task in done: print ('Task ret: ' , task.result())
asyncio的as_completed方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 async def main (): coroutine1 = do_some_work(1 ) coroutine2 = do_some_work(2 ) coroutine3 = do_some_work(4 ) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks): result = await task loop = asyncio.get_event_loop() done = loop.run_until_complete(main())
动态注册协程 同步
当前线程创建一个事件循环,然后再新建一个线程,在新线程中启动事件循环。当前线程不会被block。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import asyncioimport timefrom threading import Threaddef start_loop (loop ): asyncio.set_event_loop(loop) loop.run_forever() def more_work (x ): print ('More work {}' .format (x)) time.sleep(x) print ('Finished more work {}' .format (x)) start = time.time() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print ('TIME: {}' .format (time.time() - start))new_loop.call_soon_threadsafe(more_work, 6 ) new_loop.call_soon_threadsafe(more_work, 3 )
异步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import timeimport asynciofrom queue import Queuefrom threading import Threaddef start_loop (loop ): asyncio.set_event_loop(loop) loop.run_forever() async def do_sleep (x, queue, msg="" ): await asyncio.sleep(x) queue.put(msg) if __name__ == '__main__' : queue = Queue() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print (time.ctime()) asyncio.run_coroutine_threadsafe(do_sleep(6 , queue, "第一个" ), new_loop) asyncio.run_coroutine_threadsafe(do_sleep(3 , queue, "第二个" ), new_loop) while True : msg = queue.get() print ("{} 协程运行完.." .format (msg)) print (time.ctime())
async with 和async for async with 异步上下文管理器指的是在enter和exit方法处能够暂停执行的上下文管理器 。
实现这样的功能,需要加入两个新的方法:__aenter__ 和__aexit__。
这两个方法都要返回一个 awaitable 类型的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 async with EXPR as VAR: BLOCK mgr = (EXPR) aenter = type (mgr).__aenter__(mgr) aexit = type (mgr).__aexit__ exc = True VAR = await aenter try : BLOCK except : if not await aexit(mgr, *sys.exc_info()): raise else : await aexit(mgr, None , None , None )
简单来说就是:
进来的时候需要await aenter
(延时操作)
出去的时候需要await aexit
(延时操作)
async for
异步迭代器就是能够在next方法中调用异步代码 。
一个异步可迭代对象(asynchronous iterable)能够在迭代过程中调用异步代码
为了支持异步迭代:
一个对象必须实现__aiter__方法,该方法返回一个异步迭代器(asynchronous iterator)对象。
一个异步迭代器对象必须实现__anext__方法,该方法返回一个awaitable 类型的值。
为了停止迭代,__anext__必须抛出一个 StopAsyncIteration 异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 async for TARGET in ITER: BLOCK else : BLOCK2 iter = (ITER)iter = type (iter ).__aiter__(iter )running = True while running: try : TARGET = await type (iter ).__anext__(iter ) except StopAsyncIteration: running = False else : BLOCK else : BLOCK2
loop.stop暂停事件循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioasync def work (loop, t ): print ('start' ) await asyncio.sleep(t) print ('after {}s stop' .format (t)) loop.stop() loop = asyncio.get_event_loop() task = asyncio.ensure_future(work(loop, 1 )) loop.run_forever() loop.close()
将普通函数作为任务注入事件循环的三种方法
loop.call_soon
loop.call_later
loop.call_at & loop.time
loop.call_soon 将普通函数作为任务加入到事件循环并立即排定任务的执行顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asyncioimport timedef hello (name ): print ('[hello] Hello, {}' .format (name)) async def work (t, name ): print ('[work ] start' , name) await asyncio.sleep(t) print ('[work ] {} after {}s stop' .format (name, t)) def main (): loop = asyncio.get_event_loop() asyncio.ensure_future(work(1 , 'A' )) loop.call_soon(hello, 'Tom' ) loop.create_task(work(2 , 'B' )) loop.run_until_complete(work(3 , 'C' )) if __name__ == '__main__' : main()
loop.call_later 可延时执行,第一个参数为延时时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import asyncioimport functoolsdef hello (name ): print ('[hello] Hello, {}' .format (name)) async def work (t, name ): print ('[work{}] start' .format (name)) await asyncio.sleep(t) print ('[work{}] stop' .format (name)) def main (): loop = asyncio.get_event_loop() asyncio.ensure_future(work(1 , 'A' )) loop.call_later(1.2 , hello, 'Tom' ) loop.call_soon(hello, 'Kitty' ) task4 = loop.create_task(work(2 , 'B' )) loop.call_later(1 , hello, 'Jerry' ) loop.run_until_complete(task4) if __name__ == '__main__' : main()
注意,call_later 这个延时 1.2 秒是事件循环启动时就开始计时的
loop.call_at & loop.time
call_at 在某时刻执行
loop.time 就是事件循环内部的一个计时方法,返回值是时刻,数据类型是 float
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioimport functoolsdef hello (name ): print ('[hello] Hello, {}' .format (name)) async def work (t, name ): print ('[work{}] start' .format (name)) await asyncio.sleep(t) print ('[work{}] stop' .format (name)) def main (): loop = asyncio.get_event_loop() start = loop.time() asyncio.ensure_future(work(1 , 'A' )) loop.call_at(start+1.2 , hello, 'Tom' ) loop.call_soon(hello, 'Kitty' ) task4 = loop.create_task(work(2 , 'B' )) loop.call_at(start+1 , hello, 'Jerry' ) loop.run_until_complete(task4) main()
协程的通讯方法 asyncio.lock 协程锁存在的意义 :
协程锁锁住了一些代码A,在执行代码A的时候,有可能就切换协程了,此时的协程锁就会锁住
这样就保证了协程锁里面的代码一定会等待被执行完毕.
协程锁的固定用法是使用 async with 创建协程锁的上下文环境 ,将代码块写入其中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import asynciol = [] lock = asyncio.Lock() async def work (name ): print (r'\\\\\\\\\\\\' ,name) async with lock: print ('{} start' .format (name)) if 'x' in l: return name print ('++++++++++' ,name) await asyncio.sleep(0 ) print ('----------' ,name) l.append('x' ) print ('{} end' .format (name)) return name async def one (): name = await work('one' ) print ('{} ok' .format (name)) async def two (): print ('///////////' ) name = await work('two' ) print ('{} ok' .format (name)) def main (): loop = asyncio.get_event_loop() tasks = asyncio.wait([one(), two()]) loop.run_until_complete(tasks) if __name__ == '__main__' : main()
asyncio.Event asyncio.Event
类似 threading.Event
用来允许多个消费者等待某个事情发生,不用通过监听一个特殊的值的来说实现类似通知 的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import asyncioimport functoolsdef set_event (event ): print ('setting event in callback' ) event.set () async def coro1 (event ): print ('coro1 waiting for event' ) await event.wait() print ('coro1 triggered' ) async def coro2 (event ): print ('coro2 waiting for event' ) await event.wait() print ('coro2 triggered' ) async def main (loop ): event = asyncio.Event() print ('event start state: {}' .format (event.is_set())) loop.call_later( 0.1 , functools.partial(set_event, event) ) await asyncio.wait([coro1(event), coro2(event)]) print ('event end state: {}' .format (event.is_set())) event_loop = asyncio.get_event_loop() try : event_loop.run_until_complete(main(event_loop)) finally : event_loop.close()
asyncio.condition Condition
的效果类似 Event
,不同的是它不是唤醒所有等待中的 coroutine, 而是通过 notify()
唤醒指定数量的待唤醒 coroutine。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import asyncioasync def consumer (condition, n ): print ('----' ) async with condition: print ('consumer {} is waiting' .format (n)) await condition.wait() print ('consumer {} triggered' .format (n)) print ('ending consumer {}' .format (n)) async def manipulate_condition (condition ): print ('starting manipulate_condition' ) await asyncio.sleep(0.1 ) for i in range (1 , 3 ): async with condition: print ('notifying {} consumers' .format (i)) condition.notify(n=i) await asyncio.sleep(0.1 ) async with condition: print ('notifying remaining consumers' ) condition.notify_all() print ('ending manipulate_condition' ) async def main (loop ): condition = asyncio.Condition() consumers = [ consumer(condition, i) for i in range (5 ) ] loop.create_task(manipulate_condition(condition)) await asyncio.wait(consumers) event_loop = asyncio.get_event_loop() try : result = event_loop.run_until_complete(main(event_loop)) finally : event_loop.close()
asyncio.Queue
queue.Queue
的queue.task_done()的功能 : 每task_done
一次 就从队列里删掉一个元素,这样在最后queue.join()的时候根据队列长度是否为零来判断队列是否结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 import asyncioasync def consumer (n, q ): print ('consumer {}: waiting for item' .format (n)) while True : print ('consumer {}: waiting for item' .format (n)) item = await q.get() print ('consumer {}: has item {}' .format (n, item)) if item is None : q.task_done() break else : await asyncio.sleep(0.01 * item) q.task_done() print ('consumer {}: ending' .format (n)) async def producer (q, num_workers ): print ('producer: starting' ) for i in range (num_workers * 3 ): await q.put(i) print ('producer: added task {} to the queue' .format (i)) print ('producer: adding stop signals to the queue' ) for i in range (num_workers): await q.put(None ) print ('producer: waiting for queue to empty' ) await q.join() print ('producer: ending' ) async def main (loop, num_consumers ): q = asyncio.Queue(maxsize=num_consumers) consumers = [ loop.create_task(consumer(i, q)) for i in range (num_consumers) ] prod = loop.create_task(producer(q, num_consumers)) await asyncio.wait(consumers + [prod]) event_loop = asyncio.get_event_loop() try : event_loop.run_until_complete(main(event_loop, 2 )) finally : event_loop.close()