sync.Mutex

  • Mutex 为互斥锁,Lock() 加锁,Unlock() 解锁
  • 在一个 goroutine 获得 Mutex 后,其他 goroutine 只能等到这个 goroutine 释放该 Mutex
  • 使用 Lock() 加锁后,不能再继续对其加锁,直到利用 Unlock() 解锁后才能再加锁
  • 在 Lock() 之前使用 Unlock() 会导致 panic 异常
  • 已经锁定的 Mutex 并不与特定的 goroutine 相关联,这样可以利用一个 goroutine 对其加锁,再利用其他 goroutine 对其解锁
  • 在同一个 goroutine 中的 Mutex 解锁之前再次进行加锁,会导致死锁
  • 适用于读写不确定,并且只有一个读或者写的场景

总结:对于同一个mutex

  • 同一个goroutine不能重复执行mutex.Lock(),否则产生deadlock
  • 在goroutineA执行mutex.Lock()之后,其他goroutine如果执行到mutex.Lock()就只能阻塞,直到goroutineA执行mutex.Unlock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var mutex sync.Mutex
fmt.Println("Lock the lock")
mutex.Lock()
fmt.Println("The lock is locked")
channels := make([]chan int, 4)
for i := 0; i < 4; i++ {
channels[i] = make(chan int)
go func(i int, c chan int) {
fmt.Println("Not lock: ", i)
mutex.Lock()
fmt.Println("Locked: ", i)
time.Sleep(time.Second)
fmt.Println("Unlock the lock: ", i)
mutex.Unlock()
c <- i
}(i, channels[i])
}
fmt.Println("Waiting")
time.Sleep(time.Second)
fmt.Println("Unlock the lock")
mutex.Unlock()
time.Sleep(time.Second)

for _, c := range channels {
<-c
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
output:

Lock the lock
The lock is locked
Waiting
Not lock: 3
Not lock: 1
Not lock: 2
Not lock: 0
Unlock the lock
Locked: 3
Unlock the lock: 3
Locked: 1
Unlock the lock: 1
Locked: 2
Unlock the lock: 2
Locked: 0
Unlock the lock: 0

在解锁之前加锁会导致死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import (
"fmt"
"sync"
)

func main() {
var mutex sync.Mutex
mutex.Lock()
fmt.Println("Locked")
mutex.Lock()
}

程序输出

1
2
Locked
fatal error: all goroutines are asleep - deadlock!

sync.RWMutex

  • RWMutex 是单写多读锁,该锁可以加多个读锁或者一个写锁
  • 读锁占用的情况下会阻止写,不会阻止读,多个 goroutine 可以同时获取读锁
  • 写锁会阻止其他 goroutine(无论读和写)进来,整个锁由该 goroutine 独占
  • 适用于读多写少的场景

Lock() 和 Unlock()

  • Lock() 加写锁,Unlock() 解写锁
  • 如果在加写锁之前已经有其他的读锁和写锁,则 Lock() 会阻塞直到该锁可用,为确保该锁可用,已经阻塞的 Lock() 调用会从获得的锁中排除新的读取器,即写锁权限高于读锁,有写锁时优先进行写锁定
  • 在 Lock() 之前使用 Unlock() 会导致 panic 异常

RLock() 和 RUnlock()

  • RLock() 加读锁,RUnlock() 解读锁
  • RLock() 加读锁时,如果存在写锁,则无法加读锁;当只有读锁或者没有锁时,可以加读锁,读锁可以加载多个
  • RUnlock() 解读锁,RUnlock() 撤销单次 RLock() 调用,对于其他同时存在的读锁则没有效果
  • 在没有读锁的情况下调用 RUnlock() 会导致 panic 错误
  • RUnlock() 的个数不得多余 RLock(),否则会导致 panic 错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"sync"
"fmt"
"time"
)

func main() {
var mutex *sync.RWMutex
mutex = new(sync.RWMutex)
fmt.Println("Lock the lock")
mutex.Lock()
fmt.Println("The lock is locked")

channels := make([]chan int, 4)
for i := 0; i < 4; i++ {
channels[i] = make(chan int)
go func(i int, c chan int) {
fmt.Println("Not lock: ", i)
mutex.Lock()
fmt.Println("Locked: ", i)
fmt.Println("Unlock the lock: ", i)
mutex.Unlock()
c <- i
}(i, channels[i])
}
fmt.Println("waiting")
time.Sleep(time.Second)
fmt.Println("Unlock the lock")
mutex.Unlock()
time.Sleep(time.Second)

for _, c := range channels {
<-c
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// output:

Lock the lock
The lock is locked
waiting
Not lock: 0
Not lock: 3
Not lock: 1
Not lock: 2
Unlock the lock
Locked: 0
Unlock the lock: 0
Locked: 3
Unlock the lock: 3
Locked: 1
Unlock the lock: 1
Locked: 2
Unlock the lock: 2

Lock() 和 RLock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var mutex *sync.RWMutex
mutex = new(sync.RWMutex)
fmt.Println("Lock the lock")
mutex.Lock()
fmt.Println("The lock is locked")

channels := make([]chan int, 4)
for i := 0; i < 4; i++ {
channels[i] = make(chan int)
go func(i int, c chan int) {
fmt.Println("Not read lock: ", i)
mutex.RLock()
fmt.Println("Read Locked: ", i)

time.Sleep(time.Second)
fmt.Println("Unlock the read lock: ", i)
mutex.RUnlock()
c <- i
}(i, channels[i])
}
time.Sleep(time.Second)
fmt.Println("Unlock the lock")
mutex.Unlock()
time.Sleep(time.Second)

for _, c := range channels {
<-c
}
}

程序输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Lock the lock
The lock is locked
Not read lock: 0
Not read lock: 3
Not read lock: 1
Not read lock: 2
Unlock the lock
Read Locked: 2
Read Locked: 0
Read Locked: 3
Read Locked: 1
Unlock the read lock: 1
Unlock the read lock: 3
Unlock the read lock: 2
Unlock the read lock: 0

Unlock() 使用之前不存在 Lock()

1
2
3
4
5
6
7
8
9
10
11
package main

import (
"sync"
)

func main() {
var rwmutex *sync.RWMutex
rwmutex = new(sync.RWMutex)
rwmutex.Unlock()
}

程序输出:

1
fatal error: sync: Unlock of unlocked RWMutex

RWMutex 使用不当导致的死锁

示例1:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import (
"sync"
)

func main() {
var rwmutex *sync.RWMutex
rwmutex = new(sync.RWMutex)
rwmutex.Lock()
rwmutex.Lock()
}

程序输出:

1
fatal error: all goroutines are asleep - deadlock!

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import (
"sync"
)

func main() {
var rwmutex *sync.RWMutex
rwmutex = new(sync.RWMutex)
rwmutex.Lock()
rwmutex.RLock()
}

程序输出:

1
fatal error: all goroutines are asleep - deadlock!

RUnlock() 之前不存在 RLock()

1
2
3
4
5
6
7
8
9
10
11
package main

import (
"sync"
)

func main() {
var rwmutex *sync.RWMutex
rwmutex = new(sync.RWMutex)
rwmutex.RUnlock()
}

程序输出:

1
fatal error: sync: RUnlock of unlocked RWMutex

RUnlock() 个数多于 RLock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"sync"
)

func main() {
var rwmutex *sync.RWMutex
rwmutex = new(sync.RWMutex)
rwmutex.RLock()
rwmutex.RLock()
rwmutex.RUnlock()
rwmutex.RUnlock()
rwmutex.RUnlock()
}

程序输出:

1
fatal error: sync: RUnlock of unlocked RWMutex

sync.WaitGroup

官方文档对 WaitGroup 的描述是:一个 WaitGroup 对象可以等待一组协程结束。使用方法是:

  1. main协程通过调用 wg.Add(delta int) 设置worker协程的个数,然后创建worker协程;
  2. worker协程执行结束以后,都要调用 wg.Done()
  3. main协程调用 wg.Wait() 且被block,直到所有worker协程全部执行结束后返回。

sync.WaitGroup只有3个方法:

  1. Add()
  2. Done()
  3. Wait()

其中Done()是Add(-1)的别名。简单的来说,使用Add()添加计数,Done()减掉一个计数,计数不为0, 阻塞Wait()的运行

最标准的用法

主goroutine调用Add来设置要等待的goroutine的数量。然后每个goroutine运行并在完成时调用Done。

1
2
3
4
5
6
7
8
9
10
11
func main() {
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
fmt.Println(i)
wg.Done()
}(i)
}
wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
// 省略部分代码 ...
var wg sync.WaitGroup
for _, task := range tasks {
task := task
wg.Add(1)
go func() {
task()
wg.Done()
}()
}
wg.Wait()
// 省略部分代码...
}

WaitGroup 正确使用的要素:

  1. wg.Done 必须在wg.Add 之后执行,所以要保证两个函数都在main协程中调用;
  2. wg.Done 在 worker协程里调用,尤其要保证调用一次,不能因为 panic 或任何原因导致没有执行(建议使用 defer wg.Done());
  3. wg.Donewg.Wait 在时序上是没有先后。

细心的朋友可能会发现一行非常诡异的代码:

1
task := task

Go 对 array/slice 进行遍历时,runtime 会把 task[i] 拷贝到 task 的内存地址,下标 i 会变,而 task 的内存地址不会变。如果不进行这次赋值操作,所有 goroutine 可能读到的都是最后一个task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"unsafe"
)

func main() {
tasks := []func(){
func() { fmt.Printf("1. ") },
func() { fmt.Printf("2. ") },
}

for idx, task := range tasks {
task()
fmt.Printf("遍历 = %v, ", unsafe.Pointer(&task))
fmt.Printf("下标 = %v, ", unsafe.Pointer(&tasks[idx]))
task := task
fmt.Printf("局部变量 = %v\\n", unsafe.Pointer(&task))
}
}

// output:
// 1. 遍历 = 0x40c140, 下标 = 0x40c138, 局部变量 = 0x40c150
// 2. 遍历 = 0x40c140, 下标 = 0x40c13c, 局部变量 = 0x40c158

WaitGroup对象不是一个引用类型

WaitGroup对象不是一个引用类型,在通过函数传值的时候需要使用地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go f(i, &wg)
}
wg.Wait()
}

// 一定要通过指针传值,不然进程会进入死锁状态
func f(i int, wg *sync.WaitGroup) {
fmt.Println(i)
wg.Done()
}

sync.Once

sync.Once 是 Golang package 中使方法只执行一次的对象实现,作用与 init 函数类似。但也有所不同。

  • init 函数是在文件包首次被加载的时候执行,且只执行一次
  • sync.Once 是在代码运行中需要的时候执行,且只执行一次

当一个函数不希望程序在一开始的时候就被执行的时候,我们可以使用 sync.Once

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"sync"
)

func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}

// Output:
// Only once

sync.Cond

与互斥量不同,条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程

条件变量总是与互斥量组合使用。

  • 互斥量为共享数据的访问提供互斥支持,
  • 条件变量可以就共享数据的状态的变化向相关线程发出通知。
1
2
3
4
5
lock := new(sync.Mutex)
cond := sync.NewCond(lock)

// 也可以写成一行
cond := sync.NewCond(new(sync.Mutex))
1
2
3
4
5
6
// 方法
cond.L.Lock()
cond.L.Unlock()
cond.Wait()
cond.Signal()
cond.Broadcast()
  • sync.NewCond(l Locker): 新建一个 sync.Cond 变量。注意该函数需要一个 Locker 作为必填参数,这是因为在 cond.Wait() 中底层会涉及到 Locker 的锁操作。
  • cond.L.Lock()cond.L.Unlock():也可以使用lock.Lock()lock.Unlock(),完全一样,因为是指针转递
  • cond.Wait():等待被唤醒。唤醒期间会解锁并切走 goroutine。
  • cond.Signal():只唤醒一个最先 Wait 的 goroutine。若没有Wait(),也不会报错。Signal()通知的顺序是根据原来加入通知列表(Wait())的先入先出
  • cond.Broadcast(): 将全部 Wait 的 goroutine 都唤醒。若没有Wait(),也不会报错

一句话总结:sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"fmt"
"sync"
"time"
)

var sharedRsc = false

func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for sharedRsc == false {
fmt.Println("goroutine1 wait")
c.Wait()
}
fmt.Println("goroutine1", sharedRsc)
c.L.Unlock()
wg.Done()
}()

go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for sharedRsc == false {
fmt.Println("goroutine2 wait")
c.Wait()
}
fmt.Println("goroutine2", sharedRsc)
c.L.Unlock()
wg.Done()
}()

fmt.Println("waiting")
// this one writes changes to sharedRsc
time.Sleep(2 * time.Second)
c.L.Lock()
fmt.Println("main goroutine ready")
sharedRsc = true
c.Broadcast()
fmt.Println("main goroutine broadcast")
c.L.Unlock()
wg.Wait()
}

// output:

// waiting
// goroutine 2
// goroutine2 wait
// goroutine 1
// goroutine1 wait
// main goroutine ready
// main goroutine broadcast
// goroutine1 true
// goroutine2 true

sync.Cond 的 Wait 过程,可以简单用下图表示:

sync.Cond wait 过程

sync.Cond 的惯用法及使用注意事项

  1. sync.Cond不能拷贝,否则将会造成panic("sync.Cond is copied")错误

  2. Wait 的调用一定要放在 Lock 和 UnLock 中间,否则将会造成panic("sync: unlock of unlocked mutex")错误。

    1
    2
    3
    4
    5
    6
    c.L.Lock()
    for !condition() {
    c.Wait()
    }
    ... make use of condition ...
    c.L.Unlock()

    原因很简单,因为Wait源码内部执行了Unlock:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func (c *Cond) Wait() {
    c.checker.check()
    // 这里不要误解了,c.notify 并不是 notifyList 里面的 notify 属性,而是 notifyList 本身
    t := runtime_notifyListAdd(&c.notify)
    // 注意这里,必须先解锁,因为 runtime_notifyListWait 要切走 goroutine
    // 所以这里要解锁,要不然其他 goroutine 没法获取到锁了
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    // 这里已经唤醒了,因此需要再度锁上
    c.L.Lock()
    }
  3. Wait 调用的条件检查一定要放在 for 循环中。这是因为当 Boardcast 唤醒时,有可能其他 goroutine 先于当前 goroutine 唤醒并抢到锁,导致轮到当前 goroutine 抢到锁的时候,条件又不再满足了。因此,需要将条件检查放在 for 循环中。

  4. Signal 和 Boardcast 两个唤醒操作不需要加锁。

sync.ErrGroup

Go 团队发布的第一个 goroutines 的管理工具是 sync.WaitGroup,这个工具允许你创建 WaitGroup 去等待一定数量的 goroutines 执行完成。这里有个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()

WaitGroup 使你在处理并发任务时对 goroutines 的创建和停止的数量控制都变的更加简单。每次你创建 goroutine 的时候只要调用 Add() 就可以了。当这个任务结束调用 wg.Done()。等待所有的任务完成,调用 wg.Wait()。但是用 WatiGroup 唯一的问题就是当你的 goroutines 出错时,你不能捕获到错误的原因。

sync.ErrGroup 相当于为 sync.WaitGroup 增加了错误返回的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var g errgroup.Group
var urls = []string{
"errorURL1",
"errorURL2",
"errorURL3",
}
for _, url := range urls {
url := url
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}

g.Go() 这个方法不仅允许你传一个匿名的函数,而且还能捕获错误信息,你只要像这样返回一个错误 return err。这对使用 goroutines 的开发者来说在功能上是一个很大的提升。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main
import (
"fmt"
"golang.org/x/sync/errgroup"
"errors"
)
func main() {
group := new(errgroup.Group)
nums := []int{-1, 0, 1}

for _, num := range nums {
tempNum := num // 子协程中若直接访问num,则可能是同一个变量,所以要用临时变量
// 子协程
group.Go(func() error {
if tempNum < 0 {
return errors.New("tempNum < 0 !!!")
}
fmt.Println("tempNum:",tempNum)
return nil
})
}
// 捕获err
if err := group.Wait(); err != nil {
fmt.Println("Get errors: ", err)
} else {
fmt.Println("Get all num successfully!")
}
}

sync.semaphore.WaitGroup

sync.semaphore用途

  • 需求:要限制服务的并发次数,但是每个API要求的算力还不一样,
  • 怎么做,如果用上sync/semaphore就行。

sync.singleflight.Group

singleflight.Group用途

  • singleflight.Group是大并发下面的一把利器。一个请求如果访问1s,同样的请求来了500路,如果你的机器只能支持100路,处理完请求要5s。如果用上这个库,同样的机器,只要1s
  • 原因: singleflight.Group会缓存那一瞬间的并发请求(相同key值)

使用流程

  • 声明总权重数: s := semaphore.NewWeighted(10),申请的总权重数是10
  • 申请你的权重,不同的业务使用不同的权重,比如这里的mysql使用了2,cached使用了1.: b := c.TryAcquire(c.weighted),从总权重数减去你申请的
  • 运行一些业务,这里使用sleep模拟真实业务运行时间: time.Sleep(time.Second)
  • 释放你的权重,有借有还,再借不难: c.Release(c.weighted),恢复权重数至你申请之前
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"context"
"fmt"
"sync"
"time"

"golang.org/x/sync/semaphore"
)

func doSomething(u string) {// 模拟抓取任务的执行
fmt.Println(u)
time.Sleep(2 * time.Second)
}

const (
Limit = 3 // 同時并行运行的goroutine上限
Weight = 1 // 每个goroutine获取信号量资源的权重
)

func main() {
urls := []string{
"http://www.example.com",
"http://www.example.net",
"http://www.example.net/foo",
"http://www.example.net/bar",
"http://www.example.net/baz",
}
s := semaphore.NewWeighted(Limit)
var w sync.WaitGroup
for _, u := range urls {
w.Add(1)
go func(u string) {
s.Acquire(context.Background(), Weight)
doSomething(u)
s.Release(Weight)
w.Done()
}(u)
}
w.Wait()

fmt.Println("All Done")
}

缓存网站并发访问结果

  • 使用var group singleflight.Group 缓存https://http2.golang.org/reqinfo网站并发访问结果。
  • 第一个go程成功访问https://http2.golang.org/reqinfo网站内容,后面排队中的并发goroutine不会真的访问,都是从缓存里面拿结果。
  • 注意: 要体会singleflight.Group和普通缓存的区别,可以把下面的go func(id int){},使用goroutine起回调函数和去除goroutine起有什么区别,你会发现goroutine起的函数会缓存,不用goroutine起的函数没有缓存,请仔细体会这点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"github.com/guonaihong/gout"
"golang.org/x/sync/singleflight"
"sync"
)

func main() {
var group singleflight.Group
var wg sync.WaitGroup

wg.Add(10)
defer wg.Wait()
for i := 0; i < 10; i++ {
go func(id int) {
defer wg.Done()

res, err, _ := group.Do("reqinfo", func() (interface{}, error) {
s := ""
err := gout.GET("https://http2.golang.org/reqinfo").Debug(true).BindBody(&s).Do()
if err != nil {
return nil, err
}
return s, nil
})

if err != nil {
fmt.Printf("fail:%s\n", err)
}

fmt.Printf("id(%d) ------>%s\n", id, res.(string))
}(i)
}
}

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package singlefilght

import "sync"

type Group struct {
mu sync.Mutex
m map[string]*Call // 对于每一个需要获取的key有一个对应的call
}

// call代表需要被执行的函数
type Call struct {
wg sync.WaitGroup // 用于阻塞这个调用call的其他请求
val interface{} // 函数执行后的结果
err error // 函数执行后的error
}

func (g *Group) Do(key string, fn func()(interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*Call)
}

// 如果获取当前key的函数正在被执行,则阻塞等待执行中的,等待其执行完毕后获取它的执行结果
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}

// 初始化一个call,往map中写后就解
c := new(Call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

// 执行获取key的函数,并将结果赋值给这个Call
c.val, c.err = fn()
c.wg.Done()

// 重新上锁删除key
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()

return c.val, c.err
}