来,控制一下 Goroutine 的并发数量

1
2
3
4
5
6
7
8
9
10
func main() {
userCount := math.MaxInt64
for i := 0; i < userCount; i++ {
go func(i int) {
// 做一些各种各样的业务逻辑处理
fmt.Printf("go func: %d\n", i)
time.Sleep(time.Second)
}(i)
}
}

在这里,假设 userCount 是一个外部传入的参数(不可预测,有可能值非常大),有人会全部丢进去循环。想着全部都并发 goroutine 去同时做某一件事。这将耗费大量资源,很可能会击穿电脑。

尝试 chan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
userCount := 10
ch := make(chan bool, 2)
for i := 0; i < userCount; i++ {
ch <- true
go Read(ch, i)
}

//time.Sleep(time.Second)
}

func Read(ch chan bool, i int) {
fmt.Printf("go func: %d\n", i)
<- ch
}

输出结果:

1
2
3
4
5
6
7
8
9
go func: 1
go func: 2
go func: 3
go func: 4
go func: 5
go func: 6
go func: 7
go func: 8
go func: 0

嗯,我们似乎很好的控制了 2 个 2 个的 “顺序” 执行多个 goroutine。但是,问题出现了。你仔细数一下输出结果,才 9 个值?

这明显就不对。原因出在当主协程结束时,子协程也是会被终止掉的。因此剩余的 goroutine 没来及把值输出,就被送上路了(不信你把 time.Sleep 打开看看,看看输出数量)

尝试 sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var wg = sync.WaitGroup{}

func main() {
userCount := 10
for i := 0; i < userCount; i++ {
wg.Add(1)
go Read(i)
}

wg.Wait()
}

func Read(i int) {
defer wg.Done()
fmt.Printf("go func: %d\n", i)
}

单纯的使用 sync.WaitGroup 也不行。没有控制到同时并发的 goroutine 数量(代指达不到本文所要求的目标)

单纯简单使用 channel 或 sync 都有明显缺陷,不行。我们再看看组件配合能不能实现

尝试 chan + sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var wg = sync.WaitGroup{}

func main() {
userCount := 10
ch := make(chan bool, 2)
for i := 0; i < userCount; i++ {
wg.Add(1)
ch <- true
go Read(ch, i)
}

wg.Wait()
}

func Read(ch chan bool, i int) {
defer wg.Done()
fmt.Printf("go func: %d, time: %d\n", i, time.Now().Unix())
time.Sleep(time.Second)
<-ch
}

输出结果:

1
2
3
4
5
6
7
8
9
10
go func: 9, time: 1547911938
go func: 1, time: 1547911938
go func: 6, time: 1547911939
go func: 7, time: 1547911939
go func: 8, time: 1547911940
go func: 0, time: 1547911940
go func: 3, time: 1547911941
go func: 2, time: 1547911941
go func: 4, time: 1547911942
go func: 5, time: 1547911942

从输出结果来看,确实实现了控制 goroutine 以 2 个 2 个的数量去执行我们的 “业务逻辑”,当然结果集也理所应当的是乱序输出

方案一:简单 Semaphore

在确立了简单使用 chan + sync 的方案是可行后,我们重新将流转逻辑封装为 gsema,主程序变成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import (
"fmt"
"time"

"github.com/EDDYCJY/gsema"
)

var sema = gsema.NewSemaphore(3)

func main() {
userCount := 10
for i := 0; i < userCount; i++ {
go Read(i)
}

sema.Wait()
}

func Read(i int) {
defer sema.Done()
sema.Add(1)

fmt.Printf("go func: %d, time: %d\n", i, time.Now().Unix())
time.Sleep(time.Second)
}

在上述代码中,程序执行流程如下:

  • 设置允许的并发数目为 3 个
  • 循环 10 次,每次启动一个 goroutine 来执行任务
  • 每一个 goroutine 在内部利用 sema 进行调控是否阻塞
  • 按允许并发数逐渐释出 goroutine,最后结束任务

看上去人模人样,没什么严重问题。但却有一个 “大” 坑,认真看到第二点 “每次启动一个 goroutine” 这句话。这里有点问题,提前产生那么多的 goroutine 会不会有什么问题,接下来一起分析下利弊,如下:

利:

  • 适合量不大、复杂度低的使用场景
    • 几百几千个、几十万个也是可以接受的
    • 实际业务逻辑在运行前就已经被阻塞等待了(因为并发数受限),基本实际业务逻辑损耗的性能比 goroutine 本身大
    • goroutine 本身很轻便,仅损耗极少许的内存空间和调度。这种等待响应的情况都是躺好了,等待任务唤醒
  • Semaphore 操作复杂度低且流转简单,容易控制

弊:

  • 不适合量很大、复杂度高的使用场景
    • 有几百万、几千万个 goroutine 的话,就浪费了大量调度 goroutine 和内存空间。恰好你的服务器也接受不了的话
  • Semaphore 操作复杂度提高,要管理更多的状态

方案二:灵活 chan + sync

要控制输入的数量,以此达到改变允许并发运行 goroutine 的数量。我们仔细想想,要做出如下改变:

  • 输入/输出要抽离,才可以分别控制
  • 输入/输出要可变,理所应当在 for-loop 中(可设置数值的地方)
  • 允许改变 goroutine 并发数量,但它也必须有一个最大值(因为允许改变是相对)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"sync"
"time"
)

var wg sync.WaitGroup

func main() {
userCount := 10
ch := make(chan int, 5)
for i := 0; i < userCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for d := range ch {
fmt.Printf("go func: %d, time: %d\n", d, time.Now().Unix())
time.Sleep(time.Second * time.Duration(d))
}
}()
}

for i := 0; i < 10; i++ {
ch <- 1
ch <- 2
//time.Sleep(time.Second)
}

close(ch)
wg.Wait()
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
...
go func: 1, time: 1547950567
go func: 3, time: 1547950567
go func: 1, time: 1547950567
go func: 2, time: 1547950567
go func: 2, time: 1547950567
go func: 3, time: 1547950567
go func: 1, time: 1547950568
go func: 2, time: 1547950568
go func: 3, time: 1547950568
go func: 1, time: 1547950568
go func: 3, time: 1547950569
go func: 2, time: 1547950569

在 “方案二” 中,我们可以随时随地的根据新的业务需求,做如下事情:

  • 变更 channel 的输入数量
  • 能够根据特殊情况,变更 channel 的循环值
  • 变更最大允许并发的 goroutine 数量

方案三:第三方库

  • go-playground/pool
  • nozzle/throttler
  • Jeffail/tunny
  • panjf2000/ants