网址

https://www.cnblogs.com/wongbingming/p/9028851.html4

线程的状态判断

使用t.is_alive()判断线程的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
from threading import Thread
import time

def printer():
time.sleep(8)
print('this is in printer')

t = Thread(target=printer)
print(t.is_alive()) # False

t.start()
print(t.is_alive()) # True

锁嵌套与可重入锁(Rlock)

有时候在同一个线程中,我们可能会多次请求同一资源(就是,获取同一锁钥匙),俗称锁嵌套

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading

def main():
n = 0
lock = threading.Lock()
with lock:
for i in range(10):
n += 1
with lock:
print(n)

t1 = threading.Thread(target=main)
t1.start()

因为第二次获取锁时,发现锁已经被同一线程的人拿走了。自己也就理所当然,拿不到锁,程序就卡住了。
threading模块除了提供Lock锁之外,还提供了一种可重入锁RLock,专门来处理这个问题。

线程A获得可以重用锁,并可以多次成功获取,不会阻塞
最后要在线程A中acquire次数要和release相同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading

def main():
n = 0
# 生成可重入锁对象
lock = threading.RLock()
with lock:
for i in range(10):
n += 1
with lock:
print(n)

t1 = threading.Thread(target=main)
t1.start()
# 1 2 3 4 5 6 8 9 10

可重入锁,只在同一线程里,放松对锁钥匙的获取,其他与Lock并无二致。

防止死锁的加锁机制

死锁通常以下两种:

  1. 同一线程,嵌套获取同把锁。
  2. 多个线程,不按顺序同时获取多个锁。

使用ThreadLocal对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading

def run(x,n):
x += n
x -= n

def func(n):
# 给local对象增加一个x属性
# 每个线程都有local.x,就是线程的局部变量
local.x = num
for i in range(1000000):
run(local.x,n)
print(f'{threading.current_thread().name}------{local.x}')


if __name__ == '__main__':
# 创建一个全局的ThreadLocal对象
# 每个线程有独立的存储空间
# 每个线程对ThreadLocal对象都可以读写,但是互不影响
# 简单来说,就像进程一样,让不同的线程备份一个一模一样的变量
local = threading.local()
num = 0

t1 = threading.Thread(target=func,args=(6,))
t2 = threading.Thread(target=func,args=(9,))

t1.start()
t2.start()

t1.join()
t2.join()
print(num)

# Thread-2------0
# Thread-1------0
# 0

使用诀窍 : 只要修改线程对应的函数里需要锁定的变量即可

只要两个(或多个)线程获取嵌套锁时,按照固定顺序就能保证程序不会进入死锁状态。

辅助函数来对锁进行排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import threading
from contextlib import contextmanager


_local = threading.local()

@contextmanager
def acquire(*locks):
# 按对象标识符对锁排序
locks = sorted(locks, key=lambda x: id(x))

# 确保不违反 以前获取的锁的顺序
acquired = getattr(_local,'acquired',[])
if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
raise RuntimeError('锁定顺序冲突')

# 获取所有锁
acquired.extend(locks)
_local.acquired = acquired

try:
for lock in locks:
lock.acquire()
yield
finally:
# 按采集的相反顺序释放锁
for lock in reversed(locks):
lock.release()
del acquired[-len(locks):]


if __name__ == '__main__':
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
while True:
with acquire(x_lock):
with acquire(y_lock):
print('Thread-1')

def thread_2():
while True:
with acquire(y_lock):
with acquire(x_lock):
print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()
  • 表面上thread_1的先获取锁x,再获取锁y,而thread_2是先获取锁y,再获取x
  • 实际上,acquire函数,已经对xy两个锁进行了排序。
  • 所以thread_1hread_2都是以同一顺序来获取锁的,是不是造成死锁的。

线程消息通信机制任务协调

要实现对多个线程进行控制,其实本质上就是消息通信机制在起作用,利用这个机制发送指令,告诉线程,什么时候可以执行,什么时候不可以执行,执行什么内容。

线程中通信方法大致有如下三种:

  • threading.Event
  • threading.Condition
  • queue.Queue

Event事件

  • Python提供了非常简单的通信机制 Threading.Event,通用的条件变量。
  • 多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活

Event的三个函数

1
2
3
4
5
6
7
8
9
10
event = threading.Event()

# 重置event,使得所有该event事件都处于待命状态
event.clear()

# 等待接收event的指令,决定是否阻塞程序执行
event.wait()

# 发送event指令,使所有设置该event事件的线程执行
event.set()

简单来说就是:

  1. event.clear() : 重置event
  2. event.wait() : 阻塞线程
  3. event.set() : 解阻塞线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import time
import threading


class MyThread(threading.Thread):
def __init__(self, name, event):
super().__init__()
self.name = name
self.event = event

def run(self):
print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
# 等待event.set()后,才能往下执行
self.event.wait()
print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))


threads = []
event = threading.Event()

# 定义五个线程
[threads.append(MyThread(str(i), event)) for i in range(1,5)]

# 重置event,使得event.wait()起到阻塞作用
event.clear()

# 启动所有线程
[t.start() for t in threads]

print('等待5s...')
time.sleep(5)

print('唤醒所有线程...')
event.set()

# Thread: 1 start at Fri Jul 12 20:50:33 2019
# Thread: 2 start at Fri Jul 12 20:50:33 2019
# Thread: 3 start at Fri Jul 12 20:50:33 2019
# Thread: 4 start at Fri Jul 12 20:50:33 2019
# 等待5s...
# 唤醒所有线程...
# Thread: 1 finish at Fri Jul 12 20:50:38 2019
# Thread: 2 finish at Fri Jul 12 20:50:38 2019
# Thread: 3 finish at Fri Jul 12 20:50:38 2019
# Thread: 4 finish at Fri Jul 12 20:50:38 2019

Condition

Condition和Event 是类似的,并没有多大区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cond = threading.Condition()

# 类似lock.acquire()
cond.acquire()

# 类似lock.release()
cond.release()

# 等待指定触发,同时会解锁,直到被notify才重新占有锁。
# 线程挂起,直到收到一个notify通知才会被唤醒继续运行
cond.wait()

# 发送指定,触发执行
# 通知其他线程执行代码,那些挂起的线程接到这个通知之后会开始运行
cond.notify()

# 如果wait状态线程比较多,notifyAll的作用就是通知所有线程
cond.notifyAll()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import threading, time

class Hider(threading.Thread):
def __init__(self, cond, name):
super(Hider, self).__init__()
self.cond = cond
self.name = name

def run(self):
time.sleep(1) #确保先运行Seeker中的方法
self.cond.acquire()

print(self.name + ': 我已经把眼睛蒙上了')
self.cond.notify()
self.cond.wait()
print(self.name + ': 我找到你了哦 ~_~')
self.cond.notify()

self.cond.release()
print(self.name + ': 我赢了')

class Seeker(threading.Thread):
def __init__(self, cond, name):
super(Seeker, self).__init__()
self.cond = cond
self.name = name

def run(self):
self.cond.acquire()
self.cond.wait()
print(self.name + ': 我已经藏好了,你快来找我吧')
self.cond.notify()
self.cond.wait()
self.cond.release()
print(self.name + ': 被你找到了,哎~~~')

cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

# hider: 我已经把眼睛蒙上了
# seeker: 我已经藏好了,你快来找我吧
# hider: 我找到你了哦 ~_~
# hider: 我赢了
# seeker: 被你找到了,哎~~~

注意:

  1. 必须先要获取锁self.cond.acquire()后,才能使用self.cond.wait()self.cond.notify().而且,最后还要解开锁self.cond.release()
  2. 所以,我们一样可以使用上下文管理器with cond
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading

class Me(threading.Thread):
def run(self):
with cond:
print('hello1')
cond.notify()
cond.wait()
print(1)

class You(threading.Thread):
def run(self):
with cond:
print('hello')
cond.wait()
cond.notify()
print(2)

if __name__ == '__main__':
cond = threading.Condition()
You().start()
Me().start()

注意:两个线程的启动顺序一定不能颠倒
因为先启动的线程可能直到发完notify通知了,另一个线程才开始启动,一直处于状态,造成的结果就是2根线程就一直在那挂起.

Queue队列

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用put()get() 操作来向队列中添加或者删除元素。

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from queue import Queue
# maxsize默认为0,不受限
# 一旦>0,而消息数又达到限制,q.put()也将阻塞
q = Queue(maxsize=0)

# 阻塞程序,等待队列消息。
q.get()

# 获取消息,设置超时时间
q.get(timeout=5.0)

# 发送消息
q.put()

# 等待所有的消息都被消费完
q.join()

# 以下三个方法,知道就好,代码中不要使用

# 查询当前队列的消息个数
q.qsize()

# 队列消息是否都被消费完,True/False
q.empty()

# 检测队列里消息是否已满
q.full()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from queue import Queue
from threading import Thread
import time

class Student(Thread):
def __init__(self, name, queue):
super().__init__()
self.name = name
self.queue = queue

def run(self):
while True:
# 阻塞程序,时刻监听老师,接收消息
msg = self.queue.get()
# 一旦发现点到自己名字,就赶紧答到
if msg == self.name:
print("{}:到!".format(self.name))
break


class Teacher:
def __init__(self, queue):
self.queue=queue

def call(self, student_name):
print("老师:{}来了没?".format(student_name))
# 发送消息,要点谁的名
self.queue.put(student_name)


queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name="小明", queue=queue).start()
s2 = Student(name="小亮", queue=queue).start()

print('开始点名~')
teacher.call('小明')
time.sleep(1)
teacher.call('小亮')

# 开始点名~
# 老师:小明来了没?
# 小明:到!
# 老师:小亮来了没?
# 小亮:到!

消息队列的先进先出

消息队列不只有queue.Queue这一个类,还有queue.LifoQueuequeue.PriorityQueue这两个类

queue.Queue:先进先出队列 : 先进入队列的消息,将优先被消费。
queue.LifoQueue:后进先出队列
queue.PriorityQueue:优先级队列

1
2
3
4
5
6
7
8
9
import queue

q = queue.Queue()

for i in range(5):
q.put(i)

while not q.empty():
print(q.get())
1
2
3
4
5
6
7
8
9
import queue

q = queue.LifoQueue()

for i in range(5):
q.put(i)

while not q.empty():
print q.get()

线程中的信息隔离

信息隔离 : 有两个线程,线程A里的变量,和线程B里的变量值不能共享。

使用threading.local类,可以很方便的控制变量的隔离,即使是同一个变量,在不同的线程中,其值也是不能共享的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from threading import local, Thread, currentThread

class MyThread(Thread):
def run(self):
print("赋值前-子线程:", currentThread(),local_data.__dict__)
# 在子线程中存入name这个变量
local_data.name = self.getName()
print("赋值后-子线程:",currentThread(), local_data.__dict__)

if __name__ == '__main__':
# 定义一个local实例
local_data = local()

print("开始前-主线程:",local_data.__dict__)

t1 = MyThread()
t1.start()
t1.join()

t2 = MyThread()
t2.start()
t2.join()

print("结束后-主线程:",local_data.__dict__)

# 开始前-主线程: {}
# 赋值前-子线程: <MyThread(Thread-1, started 10008)> {}
# 赋值后-子线程: <MyThread(Thread-1, started 10008)> {'name': 'Thread-1'}
# 赋值前-子线程: <MyThread(Thread-2, started 8100)> {}
# 赋值后-子线程: <MyThread(Thread-2, started 8100)> {'name': 'Thread-2'}
# 结束后-主线程: {}
  • local实际是一个字典型的对象,其内部可以以key-value的形式存入你要做信息隔离的变量。
  • local实例可以是全局唯一的,只有一个。因为你在给local存入或访问变量时,它会根据当前的线程的不同从不同的存储空间存入或获取。

得出以下三点结论:

  1. 主线程中的变量,不会因为其是全局变量,而被子线程获取到;
  2. 主线程也不能获取到子线程中的变量;
  3. 子线程与子线程之间的变量也不能互相访问。

简单来说 : 主线程和多个子线程之间两两不互通.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import threading
from functools import partial
from socket import socket, AF_INET, SOCK_STREAM

class LazyConnection:
def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
self.address = address
self.family = AF_INET
self.type = SOCK_STREAM
self.local = threading.local()

def __enter__(self):
if hasattr(self.local, 'sock'):
raise RuntimeError('Already connected')
# 把socket连接存入local中
self.local.sock = socket(self.family, self.type)
self.local.sock.connect(self.address)
return self.local.sock

def __exit__(self, exc_ty, exc_val, tb):
self.local.sock.close()
del self.local.sock

def spider(conn, website):
with conn as s:
header = 'GET / HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n'.format(website)
s.send(header.encode("utf-8"))
resp = b''.join(iter(partial(s.recv, 100000), b''))
print('Got {} bytes'.format(len(resp)))

if __name__ == '__main__':
# 建立一个TCP连接
conn = LazyConnection(('www.sina.com.cn', 80))

# 爬取两个页面
t1 = threading.Thread(target=spider, args=(conn,"news.sina.com.cn"))
t2 = threading.Thread(target=spider, args=(conn,"blog.sina.com.cn"))
t1.start()
t2.start()
t1.join()
t2.join()

创建线程池

创建线程池是通过concurrent.futures函数库中的ThreadPoolExecutor类来实现的。

使用map方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
import threading
from concurrent.futures import ThreadPoolExecutor


def target(num):
for i in range(5):
num += 1
time.sleep(1)
return num

with ThreadPoolExecutor(5) as exe:
res = exe.map(target,range(5))

for each in res:
print(each)
# 5 6 7 8 9

使用submit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
import threading
from concurrent.futures import ThreadPoolExecutor


def target():
for i in range(5):
print('running thread-{}:{}'.format(threading.get_ident(), i))
time.sleep(1)

#: 生成线程池最大线程为5个
pool = ThreadPoolExecutor(5)

for i in range(10):
pool.submit(target) # 往线程中提交,并运行

自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import threading
from queue import Queue

def target(q):
while True:
# 阻塞,等待queue.put("start")的执行
msg = q.get()
for i in range(5):
print('running thread-{}:{}'.format(threading.get_ident(), i))
time.sleep(1)

def pool(workers,queue):
for n in range(workers):
t = threading.Thread(target=target, args=(queue,))
t.daemon = True
t.start()

if __name__ == '__main__':
queue = Queue()
# 创建一个线程池:并设置线程数为5
pool(5, queue)

for i in range(100):
queue.put("start")

# 消息都被消费才能结束
queue.join()

迭代器和可迭代对象

从代码本质来看:

  1. 可迭代对象就是实现了__iter__方法
  2. 迭代器就是实现了__iter____next__方法

所以造成了可迭代对象和迭代器功能上最大的区别:

迭代器可以不使用for循环来间断获取元素值。可以直接使用next()方法来实现

委派生成器的作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 子生成器
def average_gen():
total = 0
count = 0
average = 0
while True:
new_num = yield average
count += 1
total += new_num
average = total/count

# 委托生成器
def proxy_gen():
while True:
yield from average_gen()

# 调用方
def main():
calc_average = proxy_gen()
next(calc_average) # 预激下生成器
print(calc_average.send(10)) # 打印:10.0
print(calc_average.send(20)) # 打印:15.0
print(calc_average.send(30)) # 打印:20.0

if __name__ == '__main__':
main()

上面的委派生成器只是起到了双向通道的作用.
我们使用的yield from本身也是一个双向通道,但是yield from帮我们做了很多的异常处理,而且全面,而这些如果我们要自己去实现的话,一个是编写代码难度增加,写出来的代码可读性极差

asyncio框架

await和yield from的区别

1
2
3
@asyncio.coroutine
def hello():
yield from asyncio.sleep(1)

只要在一个生成器函数头部用上 @asyncio.coroutine 装饰器就能将这个函数对象,标记为协程对象。实际上,它的本质还是一个生成器。标记后,它实际上已经可以当成协程使用。

await用于挂起阻塞的异步调用接口。其作用在一定程度上类似于yield。

注意:一定程度上意思是效果上一样(都能实现暂停的效果),但是功能上却不兼容。就是:

  • 不能在生成器中使用await
  • 不能在 async 定义的协程中使用yield

在使用方面:

  • yield from 后面可接 可迭代对象,也可接future对象/协程对象;
  • await 后面必须要接 future对象/协程对象

gather与wait的异同

把多个协程注册进一个事件循环中有两种方法:

  • loop.run_until_complete(asyncio.wait(tasks))
  • loop.run_until_complete(asyncio.gather(*tasks))

接收参数方式

  • asyncio.wait接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。
    list对象也可以存放协程对象:

    1
    2
    3
    4
    5
    6
    7
    8
    tasks=[
    asyncio.ensure_future(func("A", 2)),
    asyncio.ensure_future(func("B", 3)),
    asyncio.ensure_future(func("C", 4))
    ]

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    1
    2
    3
    4
    5
    6
    7
    8
    tasks=[
    func("A", 2),
    func("B", 3),
    func("C", 4)
    ]

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
  • ```python
    group1 = asyncio.gather([func(“A” ,i) for i in range(1, 3)])
    group2 = asyncio.gather(
    [func(“B”, i) for i in range(1, 5)])
    group3 = asyncio.gather(*[func(“B”, i) for i in range(1, 7)])

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(group1, group2, group3))

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19



    #### 返回结果不同

    - `asyncio.wait` 返回`dones`和`pendings`

    >- `dones`:表示已经完成的**任务**
    >- `pendings`:表示未完成的**任务**
    >
    >
    >
    >所以,如果我们需要获取**任务的运行结果**,需要使用`result`手工去收集获取。
    >
    >```python
    >dones, pendings = await asyncio.wait(tasks)
    >
    >for task in dones:
    > print('Task ret: ', task.result())
  • asyncio.gather 直接返回任务的运行结果。

    1
    2
    3
    4
    results = await asyncio.gather(*tasks)

    for result in results:
    print('Task ret: ', result)

wait有控制功能

  1. 使用return_when参数
  2. 使用timeout参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import random


async def coro(tag):
await asyncio.sleep(random.uniform(0.5, 5))

loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]


# 【控制运行任务数】:运行第一个任务就返回
# FIRST_COMPLETED :第一个任务完全返回
# FIRST_EXCEPTION:产生第一个异常返回
# ALL_COMPLETED:所有任务完成返回 (默认选项)
dones, pendings = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
print("第一次完成的任务数:", len(dones))


# 【控制时间】:运行一秒后,就返回
dones2, pendings2 = loop.run_until_complete(
asyncio.wait(pendings, timeout=1))
print("第二次完成的任务数:", len(dones2))


# 【默认】:所有任务完成后返回
dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))

print("第三次完成的任务数:", len(dones3))

loop.close()

动态添加协程

asyncio中将协程态添加到事件循环中的两种方法:

  • 主线程是同步
  • 主线程是异步

主线程是同步的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import time
import asyncio
from queue import Queue
from threading import Thread

def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()

def do_sleep(x, queue, msg=""):
time.sleep(x)
queue.put(msg)

if __name__ == '__main__':
queue = Queue()

new_loop = asyncio.new_event_loop()

# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 动态添加两个协程
# 这种方法,在主线程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个")
new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个")

while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())

# Sat Jul 13 10:15:28 2019
# 第一个 协程运行完..
# Sat Jul 13 10:15:34 2019
# 第二个 协程运行完..
# Sat Jul 13 10:15:37 2019

由于是同步的,所以总共耗时6+3=9秒.

主线程是异步的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import time
import asyncio
from queue import Queue
from threading import Thread

def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()

async def do_sleep(x, queue, msg=""):
await asyncio.sleep(x)
queue.put(msg)

if __name__ == '__main__':
queue = Queue()

new_loop = asyncio.new_event_loop()

# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 动态添加两个协程
# 这种方法,在主线程是异步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)

while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())

# Sat Jul 13 10:39:27 2019
# 第二个 协程运行完..
# Sat Jul 13 10:39:30 2019
# 第一个 协程运行完..
# Sat Jul 13 10:39:33 2019

由于是同步的,所以总共耗时max(6, 3)=6秒

实战:利用redis实现动态添加任务

对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。

为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import time
import redis
import asyncio
from queue import Queue
from threading import Thread

def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=2)
return redis.Redis(connection_pool=connection_pool)

def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()

async def do_sleep(x, queue):
await asyncio.sleep(x)
queue.put("ok")

def consumer():
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)


if __name__ == '__main__':
print(time.ctime())
new_loop = asyncio.new_event_loop()

# 定义一个线程,运行一个事件循环对象,用于实时接收新任务
loop_thread = Thread(target=start_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()

# 创建redis连接
rcon = get_redis()

queue = Queue()

# 子线程:实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。
consumer_thread = Thread(target=consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()

while True:
msg = queue.get()
print("协程运行完..")
print("当前时间:", time.ctime())