网址
https://www.cnblogs.com/wongbingming/p/9028851.html4
线程的状态判断 使用t.is_alive()
判断线程的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 from threading import Threadimport timedef printer (): time.sleep(8 ) print ('this is in printer' ) t = Thread(target=printer) print (t.is_alive()) t.start() print (t.is_alive())
锁嵌套与可重入锁(Rlock) 有时候在同一个线程中,我们可能会多次请求同一资源(就是,获取同一锁钥匙),俗称锁嵌套
1 2 3 4 5 6 7 8 9 10 11 12 13 import threadingdef main (): n = 0 lock = threading.Lock() with lock: for i in range (10 ): n += 1 with lock: print (n) t1 = threading.Thread(target=main) t1.start()
因为第二次获取锁时,发现锁已经被同一线程的人拿走了。自己也就理所当然,拿不到锁,程序就卡住了。threading
模块除了提供Lock
锁之外,还提供了一种可重入锁RLock
,专门来处理这个问题。
线程A获得可以重用锁,并可以多次成功获取,不会阻塞 。 最后要在线程A中acquire次数要和release相同 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import threadingdef main (): n = 0 lock = threading.RLock() with lock: for i in range (10 ): n += 1 with lock: print (n) t1 = threading.Thread(target=main) t1.start()
可重入锁,只在同一线程里 ,放松对锁钥匙的获取,其他与Lock
并无二致。
防止死锁的加锁机制 死锁通常以下两种:
同一线程,嵌套获取同把锁。
多个线程,不按顺序同时获取多个锁。
使用ThreadLocal对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import threadingdef run (x,n ): x += n x -= n def func (n ): local.x = num for i in range (1000000 ): run(local.x,n) print (f'{threading.current_thread().name} ------{local.x} ' ) if __name__ == '__main__' : local = threading.local() num = 0 t1 = threading.Thread(target=func,args=(6 ,)) t2 = threading.Thread(target=func,args=(9 ,)) t1.start() t2.start() t1.join() t2.join() print (num)
使用诀窍 : 只要修改线程对应的函数里需要锁定的变量 即可
只要两个(或多个)线程获取嵌套锁时,按照固定顺序就能保证程序不会进入死锁状态。
辅助函数来对锁进行排序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import threadingfrom contextlib import contextmanager_local = threading.local() @contextmanager def acquire (*locks ): locks = sorted (locks, key=lambda x: id (x)) acquired = getattr (_local,'acquired' ,[]) if acquired and max (id (lock) for lock in acquired) >= id (locks[0 ]): raise RuntimeError('锁定顺序冲突' ) acquired.extend(locks) _local.acquired = acquired try : for lock in locks: lock.acquire() yield finally : for lock in reversed (locks): lock.release() del acquired[-len (locks):] if __name__ == '__main__' : x_lock = threading.Lock() y_lock = threading.Lock() def thread_1 (): while True : with acquire(x_lock): with acquire(y_lock): print ('Thread-1' ) def thread_2 (): while True : with acquire(y_lock): with acquire(x_lock): print ('Thread-2' ) t1 = threading.Thread(target=thread_1) t1.daemon = True t1.start() t2 = threading.Thread(target=thread_2) t2.daemon = True t2.start()
表面上thread_1
的先获取锁x,再获取锁y
,而thread_2
是先获取锁y
,再获取x
。
实际上,acquire
函数,已经对x
,y
两个锁进行了排序。
所以thread_1
,hread_2
都是以同一顺序来获取锁的,是不是造成死锁的。
线程消息通信机制任务协调 要实现对多个线程进行控制,其实本质上就是消息通信机制在起作用,利用这个机制发送指令,告诉线程,什么时候可以执行,什么时候不可以执行,执行什么内容。
线程中通信方法大致有如下三种:
threading.Event
threading.Condition
queue.Queue
Event事件
Python提供了非常简单的通信机制 Threading.Event
,通用的条件变量。
多个线程可以等待某个事件的发生
,在事件发生后,所有的线程
都会被激活
。
Event的三个函数
1 2 3 4 5 6 7 8 9 10 event = threading.Event() event.clear() event.wait() event.set ()
简单来说就是:
event.clear() : 重置event
event.wait() : 阻塞线程
event.set() : 解阻塞线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import timeimport threadingclass MyThread (threading.Thread ): def __init__ (self, name, event ): super ().__init__() self.name = name self.event = event def run (self ): print ('Thread: {} start at {}' .format (self.name, time.ctime(time.time()))) self.event.wait() print ('Thread: {} finish at {}' .format (self.name, time.ctime(time.time()))) threads = [] event = threading.Event() [threads.append(MyThread(str (i), event)) for i in range (1 ,5 )] event.clear() [t.start() for t in threads] print ('等待5s...' )time.sleep(5 ) print ('唤醒所有线程...' )event.set ()
Condition Condition和Event 是类似的,并没有多大区别。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 cond = threading.Condition() cond.acquire() cond.release() cond.wait() cond.notify() cond.notifyAll()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import threading, timeclass Hider (threading.Thread ): def __init__ (self, cond, name ): super (Hider, self).__init__() self.cond = cond self.name = name def run (self ): time.sleep(1 ) self.cond.acquire() print (self.name + ': 我已经把眼睛蒙上了' ) self.cond.notify() self.cond.wait() print (self.name + ': 我找到你了哦 ~_~' ) self.cond.notify() self.cond.release() print (self.name + ': 我赢了' ) class Seeker (threading.Thread ): def __init__ (self, cond, name ): super (Seeker, self).__init__() self.cond = cond self.name = name def run (self ): self.cond.acquire() self.cond.wait() print (self.name + ': 我已经藏好了,你快来找我吧' ) self.cond.notify() self.cond.wait() self.cond.release() print (self.name + ': 被你找到了,哎~~~' ) cond = threading.Condition() seeker = Seeker(cond, 'seeker' ) hider = Hider(cond, 'hider' ) seeker.start() hider.start()
注意:
必须先要获取锁self.cond.acquire()
后,才能使用self.cond.wait()
和self.cond.notify()
.而且,最后还要解开锁self.cond.release()
所以,我们一样可以使用上下文管理器with cond
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import threadingclass Me (threading.Thread ): def run (self ): with cond: print ('hello1' ) cond.notify() cond.wait() print (1 ) class You (threading.Thread ): def run (self ): with cond: print ('hello' ) cond.wait() cond.notify() print (2 ) if __name__ == '__main__' : cond = threading.Condition() You().start() Me().start()
注意:两个线程的启动顺序一定不能颠倒 因为先启动的线程可能直到发完notify通知了,另一个线程才开始启动,一直处于状态,造成的结果就是2根线程就一直在那挂起 .
Queue队列 从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用put()
和 get()
操作来向队列中添加或者删除元素。
基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from queue import Queueq = Queue(maxsize=0 ) q.get() q.get(timeout=5.0 ) q.put() q.join() q.qsize() q.empty() q.full()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from queue import Queuefrom threading import Threadimport timeclass Student (Thread ): def __init__ (self, name, queue ): super ().__init__() self.name = name self.queue = queue def run (self ): while True : msg = self.queue.get() if msg == self.name: print ("{}:到!" .format (self.name)) break class Teacher : def __init__ (self, queue ): self.queue=queue def call (self, student_name ): print ("老师:{}来了没?" .format (student_name)) self.queue.put(student_name) queue = Queue() teacher = Teacher(queue=queue) s1 = Student(name="小明" , queue=queue).start() s2 = Student(name="小亮" , queue=queue).start() print ('开始点名~' )teacher.call('小明' ) time.sleep(1 ) teacher.call('小亮' )
消息队列的先进先出 消息队列不只有queue.Queue
这一个类,还有queue.LifoQueue
和queue.PriorityQueue
这两个类
queue.Queue
:先进先出队列 : 先进入队列的消息,将优先被消费。queue.LifoQueue
:后进先出队列queue.PriorityQueue
:优先级队列
1 2 3 4 5 6 7 8 9 import queueq = queue.Queue() for i in range (5 ): q.put(i) while not q.empty(): print (q.get())
1 2 3 4 5 6 7 8 9 import queueq = queue.LifoQueue() for i in range (5 ): q.put(i) while not q.empty(): print q.get()
线程中的信息隔离
信息隔离
: 有两个线程,线程A里的变量,和线程B里的变量值不能共享。
使用threading.local
类,可以很方便的控制变量的隔离,即使是同一个变量,在不同的线程中,其值也是不能共享的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from threading import local, Thread, currentThreadclass MyThread (Thread ): def run (self ): print ("赋值前-子线程:" , currentThread(),local_data.__dict__) local_data.name = self.getName() print ("赋值后-子线程:" ,currentThread(), local_data.__dict__) if __name__ == '__main__' : local_data = local() print ("开始前-主线程:" ,local_data.__dict__) t1 = MyThread() t1.start() t1.join() t2 = MyThread() t2.start() t2.join() print ("结束后-主线程:" ,local_data.__dict__)
local
实际是一个字典型
的对象,其内部可以以key-value
的形式存入你要做信息隔离的变量。
local实例可以是全局唯一
的,只有一个。因为你在给local存入或访问变量时,它会根据当前的线程的不同从不同的存储空间
存入或获取。
得出以下三点结论:
主线程中的变量,不会因为其是全局变量,而被子线程获取到;
主线程也不能获取到子线程中的变量;
子线程与子线程之间的变量也不能互相访问。
简单来说 : 主线程和多个子线程之间两两不互通 .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import threadingfrom functools import partialfrom socket import socket, AF_INET, SOCK_STREAMclass LazyConnection : def __init__ (self, address, family=AF_INET, type =SOCK_STREAM ): self.address = address self.family = AF_INET self.type = SOCK_STREAM self.local = threading.local() def __enter__ (self ): if hasattr (self.local, 'sock' ): raise RuntimeError('Already connected' ) self.local.sock = socket(self.family, self.type ) self.local.sock.connect(self.address) return self.local.sock def __exit__ (self, exc_ty, exc_val, tb ): self.local.sock.close() del self.local.sock def spider (conn, website ): with conn as s: header = 'GET / HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n' .format (website) s.send(header.encode("utf-8" )) resp = b'' .join(iter (partial(s.recv, 100000 ), b'' )) print ('Got {} bytes' .format (len (resp))) if __name__ == '__main__' : conn = LazyConnection(('www.sina.com.cn' , 80 )) t1 = threading.Thread(target=spider, args=(conn,"news.sina.com.cn" )) t2 = threading.Thread(target=spider, args=(conn,"blog.sina.com.cn" )) t1.start() t2.start() t1.join() t2.join()
创建线程池 创建线程池是通过concurrent.futures
函数库中的ThreadPoolExecutor
类来实现的。
使用map方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import timeimport threadingfrom concurrent.futures import ThreadPoolExecutordef target (num ): for i in range (5 ): num += 1 time.sleep(1 ) return num with ThreadPoolExecutor(5 ) as exe: res = exe.map (target,range (5 )) for each in res: print (each)
使用submit方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import timeimport threadingfrom concurrent.futures import ThreadPoolExecutordef target (): for i in range (5 ): print ('running thread-{}:{}' .format (threading.get_ident(), i)) time.sleep(1 ) pool = ThreadPoolExecutor(5 ) for i in range (10 ): pool.submit(target)
自定义线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import timeimport threadingfrom queue import Queuedef target (q ): while True : msg = q.get() for i in range (5 ): print ('running thread-{}:{}' .format (threading.get_ident(), i)) time.sleep(1 ) def pool (workers,queue ): for n in range (workers): t = threading.Thread(target=target, args=(queue,)) t.daemon = True t.start() if __name__ == '__main__' : queue = Queue() pool(5 , queue) for i in range (100 ): queue.put("start" ) queue.join()
迭代器和可迭代对象 从代码本质来看:
可迭代对象就是实现了__iter__
方法
迭代器就是实现了__iter__
和__next__
方法
所以造成了可迭代对象和迭代器功能上最大的区别:
迭代器可以不使用for
循环来间断获取元素值。可以直接使用next()方法来实现 。
委派生成器的作用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def average_gen (): total = 0 count = 0 average = 0 while True : new_num = yield average count += 1 total += new_num average = total/count def proxy_gen (): while True : yield from average_gen() def main (): calc_average = proxy_gen() next (calc_average) print (calc_average.send(10 )) print (calc_average.send(20 )) print (calc_average.send(30 )) if __name__ == '__main__' : main()
上面的委派生成器只是起到了双向通道
的作用. 我们使用的yield from本身也是一个双向通道,但是yield from
帮我们做了很多的异常处理,而且全面,而这些如果我们要自己去实现的话,一个是编写代码难度增加,写出来的代码可读性极差
asyncio框架 await和yield from的区别 1 2 3 @asyncio.coroutine def hello (): yield from asyncio.sleep(1 )
只要在一个生成器函数头部用上 @asyncio.coroutine 装饰器就能将这个函数对象,标记
为协程对象。实际上,它的本质还是一个生成器。标记后,它实际上已经可以当成协程使用。
await
用于挂起阻塞的异步调用接口。其作用在一定程度上类似于yield。
注意:一定程度上
意思是效果上一样(都能实现暂停的效果),但是功能上却不兼容 。就是:
不能在生成器中使用await
不能在 async 定义的协程中使用yield
在使用方面:
yield from
后面可接 可迭代对象
,也可接future对象
/协程对象;
await
后面必须要接 future对象
/协程对象
gather与wait的异同 把多个协程注册进一个事件循环中有两种方法:
loop.run_until_complete(asyncio.wait(tasks))
loop.run_until_complete(asyncio.gather(*tasks))
接收参数方式
asyncio.wait接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。 list对象也可以存放协程对象:
1 2 3 4 5 6 7 8 tasks=[ asyncio.ensure_future(func("A" , 2 )), asyncio.ensure_future(func("B" , 3 )), asyncio.ensure_future(func("C" , 4 )) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
1 2 3 4 5 6 7 8 tasks=[ func("A" , 2 ), func("B" , 3 ), func("C" , 4 ) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
```python group1 = asyncio.gather([func(“A” ,i) for i in range(1, 3)]) group2 = asyncio.gather( [func(“B”, i) for i in range(1, 5)]) group3 = asyncio.gather(*[func(“B”, i) for i in range(1, 7)])
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(group1, group2, group3))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #### 返回结果不同 - `asyncio.wait` 返回`dones`和`pendings` >- `dones`:表示已经完成的**任务** >- `pendings`:表示未完成的**任务** > > > >所以,如果我们需要获取**任务的运行结果**,需要使用`result`手工去收集获取。 > >```python >dones, pendings = await asyncio.wait(tasks) > >for task in dones: > print('Task ret: ', task.result())
asyncio.gather
直接返回任务的运行结果。
1 2 3 4 results = await asyncio.gather(*tasks) for result in results: print ('Task ret: ' , result)
wait有控制功能
使用return_when
参数
使用timeout
参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asyncioimport randomasync def coro (tag ): await asyncio.sleep(random.uniform(0.5 , 5 )) loop = asyncio.get_event_loop() tasks = [coro(i) for i in range (1 , 11 )] dones, pendings = loop.run_until_complete( asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) print ("第一次完成的任务数:" , len (dones))dones2, pendings2 = loop.run_until_complete( asyncio.wait(pendings, timeout=1 )) print ("第二次完成的任务数:" , len (dones2))dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2)) print ("第三次完成的任务数:" , len (dones3))loop.close()
动态添加协程 asyncio中将协程态添加到事件循环中的两种方法:
主线程是同步的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import timeimport asynciofrom queue import Queuefrom threading import Threaddef start_loop (loop ): asyncio.set_event_loop(loop) loop.run_forever() def do_sleep (x, queue, msg="" ): time.sleep(x) queue.put(msg) if __name__ == '__main__' : queue = Queue() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print (time.ctime()) new_loop.call_soon_threadsafe(do_sleep, 6 , queue, "第一个" ) new_loop.call_soon_threadsafe(do_sleep, 3 , queue, "第二个" ) while True : msg = queue.get() print ("{} 协程运行完.." .format (msg)) print (time.ctime())
由于是同步的,所以总共耗时6+3=9秒.
主线程是异步的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import timeimport asynciofrom queue import Queuefrom threading import Threaddef start_loop (loop ): asyncio.set_event_loop(loop) loop.run_forever() async def do_sleep (x, queue, msg="" ): await asyncio.sleep(x) queue.put(msg) if __name__ == '__main__' : queue = Queue() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print (time.ctime()) asyncio.run_coroutine_threadsafe(do_sleep(6 , queue, "第一个" ), new_loop) asyncio.run_coroutine_threadsafe(do_sleep(3 , queue, "第二个" ), new_loop) while True : msg = queue.get() print ("{} 协程运行完.." .format (msg)) print (time.ctime())
由于是同步的,所以总共耗时max(6, 3)=6秒
实战:利用redis实现动态添加任务 对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。
为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import timeimport redisimport asynciofrom queue import Queuefrom threading import Threaddef get_redis (): connection_pool = redis.ConnectionPool(host='127.0.0.1' , db=2 ) return redis.Redis(connection_pool=connection_pool) def start_loop (loop ): asyncio.set_event_loop(loop) loop.run_forever() async def do_sleep (x, queue ): await asyncio.sleep(x) queue.put("ok" ) def consumer (): while True : task = rcon.rpop("queue" ) if not task: time.sleep(1 ) continue asyncio.run_coroutine_threadsafe(do_sleep(int (task), queue), new_loop) if __name__ == '__main__' : print (time.ctime()) new_loop = asyncio.new_event_loop() loop_thread = Thread(target=start_loop, args=(new_loop,)) loop_thread.setDaemon(True ) loop_thread.start() rcon = get_redis() queue = Queue() consumer_thread = Thread(target=consumer) consumer_thread.setDaemon(True ) consumer_thread.start() while True : msg = queue.get() print ("协程运行完.." ) print ("当前时间:" , time.ctime())