尽管 Golang 推荐通过 channel 进行通信和同步,但在实际开发中 sync 包用得也非常的多。另外 sync 下还有一个 atomic 包,提供了一些底层的原子操作
整个包都围绕这 Locker 进行,这是一个 interface:
1 2 3 4 type Locker interface { Lock() Unlock() }
sync 互斥锁 Mutex 互斥锁是锁的一种具体实现,有两个方法:
1 2 func (m *Mutex) Lock () func (m *Mutex) Unlock ()
在首次使用后不要复制该互斥锁。对一个未锁定的互斥锁解锁将会产生运行时错误。
一个互斥锁只能同时被一个 goroutine 锁定,其它 goroutine 将阻塞直到互斥锁被解锁 (重新争抢对互斥锁的锁定)。如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package mainimport ( "fmt" "sync" "time" ) func main () { ch := make (chan struct {}, 2 ) var l sync.Mutex go func () { l.Lock() defer l.Unlock() fmt.Println("goroutine1: 我会锁定大概 2s" ) time.Sleep(time.Second * 2 ) fmt.Println("goroutine1: 我解锁了,你们去抢吧" ) ch <- struct {}{} }() go func () { fmt.Println("groutine2: 等待解锁" ) l.Lock() defer l.Unlock() fmt.Println("goroutine2: 哈哈,我锁定了" ) ch <- struct {}{} }() for i := 0 ; i < 2 ; i++ { <-ch } }
注意,平时所说的锁定,其实就是去锁定互斥锁,而不是说去锁定一段代码 。
也就是说,当代码执行到有锁的地方时,它获取不到互斥锁的锁定,会阻塞在那里,从而达到控制同步的目的。
读写锁 RWMutex 读写锁是针对读写操作的互斥锁,读写锁与互斥锁最大的不同就是可以分别对 读
、写
进行锁定。一般用在大量读操作、少量写操作的情况:
1 2 3 4 5 func (rw *RWMutex) Lock () func (rw *RWMutex) Unlock () func (rw *RWMutex) RLock () func (rw *RWMutex) RUnlock ()
由于这里需要区分读写锁定,我们这样定义:
读锁定(RLock),对读操作进行锁定
读解锁(RUnlock),对读锁定进行解锁
写锁定(Lock),对写操作进行锁定
写解锁(Unlock),对写锁定进行解锁
在首次使用之后,不要复制该读写锁。不要混用锁定和解锁,如:Lock 和 RUnlock、RLock 和 Unlock。因为对未读锁定的读写锁进行读解锁或对未写锁定的读写锁进行写解锁将会引起运行时错误。
如何理解读写锁呢?
同时只能有一个 goroutine 能够获得写锁定。
同时可以有任意多个 gorouinte 获得读锁定。
同时只能存在写锁定或读锁定(读和写互斥)。
也就是说:
当有一个 goroutine 获得写锁定,其它无论是读锁定还是写锁定都将阻塞直到写解锁;
当有一个 goroutine 获得读锁定,其它读锁定任然可以继续;
当有一个或任意多个读锁定,写锁定将等待所有 读锁定解锁之后才能够进行写锁定。
所以说这里的读锁定(RLock)目的其实是告诉写锁定:有很多人正在读取数据,你给我站一边去,等它们读(读解锁)完你再来写(写锁定)。
使用例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package mainimport ( "fmt" "math/rand" "sync" ) var count int var rw sync.RWMutexfunc main () { ch := make (chan struct {}, 10 ) for i := 0 ; i < 5 ; i++ { go read(i, ch) } for i := 0 ; i < 5 ; i++ { go write(i, ch) } for i := 0 ; i < 10 ; i++ { <-ch } } func read (n int , ch chan struct {}) { rw.RLock() fmt.Printf("goroutine %d 进入读操作...\n" , n) v := count fmt.Printf("goroutine %d 读取结束,值为:%d\n" , n, v) rw.RUnlock() ch <- struct {}{} } func write (n int , ch chan struct {}) { rw.Lock() fmt.Printf("goroutine %d 进入写操作...\n" , n) v := rand.Intn(1000 ) count = v fmt.Printf("goroutine %d 写入结束,新值为:%d\n" , n, v) rw.Unlock() ch <- struct {}{} }
goroutine等待组 WaitGroup WaitGroup 用于等待一组 goroutine 结束,用法很简单。它有三个方法:
1 2 3 func (wg *WaitGroup) Add (delta int ) func (wg *WaitGroup) Done () func (wg *WaitGroup) Wait ()
Add 用来添加 goroutine 的个数。Done 执行一次数量减 1。Wait 用来等待结束:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package mainimport ( "fmt" "sync" "time" ) func main () { var wg sync.WaitGroup for i := 0 ; i < 5 ; i++ { wg.Add(1 ) go func (i int ) { defer wg.Done() time.Sleep(time.Second * time.Duration(i)) fmt.Printf("goroutine%d 结束\n" , i) }(i) } wg.Wait() fmt.Println("所有 goroutine 执行结束" ) }
注意,wg.Add()
方法一定要在 goroutine 开始前执行哦。
条件变量 Cond Cond 实现一个条件变量,即等待或宣布事件发生的 goroutines 的会合点,它会保存一个通知列表。基本思想是当某中状态达成,goroutine 将会等待(Wait)在那里,当某个时刻状态改变时通过通知的方式(Broadcast,Signal)的方式通知等待的 goroutine。这样,不满足条件的 goroutine 唤醒继续向下执行,满足条件的重新进入等待序列。
1 2 3 4 5 6 7 8 9 10 11 12 13 type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker } func NewCond (l Locker) *Cond func (c *Cond) Broadcast () func (c *Cond) Signal () func (c *Cond) Wait ()
Wait 方法、Signal 方法和 Broadcast 方法。它们分别代表了等待通知、单发通知和广播通知的操作。
我们来看一下 Wait 方法:
1 2 3 4 5 6 7 func (c *Cond) Wait () { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() }
它的操作为:加入到通知列表 -> 解锁 L -> 等待通知 -> 锁定 L。其使用方法是:
1 2 3 4 5 6 c.L.Lock() for !condition() { c.Wait() } ... make use of condition ... c.L.Unlock()
举个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package mainimport ( "fmt" "sync" "time" ) var count int = 4 func main () { ch := make (chan struct {}, 5 ) var l sync.Mutex cond := sync.NewCond(&l) for i := 0 ; i < 5 ; i++ { go func (i int ) { cond.L.Lock() defer func () { cond.L.Unlock() ch <- struct {}{} }() for count > i { cond.Wait() fmt.Printf("收到一个通知 goroutine%d\n" , i) } fmt.Printf("goroutine%d 执行结束\n" , i) }(i) } time.Sleep(time.Millisecond * 20 ) fmt.Println("broadcast..." ) cond.L.Lock() count -= 1 cond.Broadcast() cond.L.Unlock() time.Sleep(time.Second) fmt.Println("signal..." ) cond.L.Lock() count -= 2 cond.Signal() cond.L.Unlock() time.Sleep(time.Second) fmt.Println("broadcast..." ) cond.L.Lock() count -= 1 cond.Broadcast() cond.L.Unlock() for i := 0 ; i < 5 ; i++ { <-ch } }
执行一次 Once 使用 sync.Once
对象可以使得函数多次调用只执行一次。其结构为:
1 2 3 4 5 6 type Once struct { m Mutex done uint32 } func (o *Once) Do (f func () )
用 done 来记录执行次数,用 m 来保证保证仅被执行一次。只有一个 Do 方法,调用执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package mainimport ( "fmt" "sync" ) func main () { var once sync.Once onceBody := func () { fmt.Println("Only once" ) } done := make (chan bool ) for i := 0 ; i < 10 ; i++ { go func () { once.Do(onceBody) done <- true }() } for i := 0 ; i < 10 ; i++ { <-done } } # 打印结果 Only once
临时对象池 Pool sync.Pool
可以作为临时对象的保存和复用 的集合。
个人觉得它的名字有一定的误导性,因为 Pool 里装的对象可以被无通知地被回收 ,可能 sync.Cache
是一个更合适的名字。
作用:
对于很多需要重复分配、回收内存的地方,sync.Pool
是一个很好的选择。
频繁地分配、回收内存会给 GC 带来一定的负担,严重的时候会引起 CPU 的毛刺,
而 sync.Pool
可以将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存 ,减轻 GC 的压力,提升系统的性能。
首先,sync.Pool
是协程安全的,这对于使用者来说是极其方便的。使用前,设置好对象的 New
函数,用于在 Pool
里没有缓存的对象时,创建一个。之后,在程序的任何地方、任何时候仅通过 Get()
、Put()
方法就可以取、还对象了。 因此关键思想就是对象的复用,避免重复创建、销毁。
适用场景:
当多个 goroutine 都需要创建同⼀个对象的时候,如果 goroutine 数过多,导致对象的创建数⽬剧增,进⽽导致 GC 压⼒增大。形成 “并发⼤-占⽤内存⼤-GC 缓慢-处理并发能⼒降低-并发更⼤”这样的恶性循环。
在这个时候,需要有⼀个对象池,每个 goroutine 不再⾃⼰单独创建对象,⽽是从对象池中获取出⼀个对象(如果池中已经有的话)。
其结构为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Pool struct { noCopy noCopy local unsafe.Pointer localSize uintptr New func () interface {} } func (p *Pool) Get () interface {}func (p *Pool) Put (x interface {})
新建 Pool 需要提供一个 New 方法,目的是当获取不到临时对象时自动创建一个(不会主动加入到 Pool 中),Get 和 Put 方法都很好理解。
深入了解过 Go 的同学应该知道,Go 的重要组成结构为 M、P、G。
Pool 实际上会为每一个操作它的 goroutine 相关联的 P 都生成一个本地池。如果从本地池 Get 对象的时候,本地池没有,则会从其它的 P 本地池获取。因此,Pool 的一个特点就是:可以把由其中的对象值产生的存储压力进行分摊。
它有着以下特点:
Pool 中的对象在仅有 Pool 有着唯一索引的情况下可能会被自动删除(取决于下一次 GC 执行的时间)。
goroutines 协程安全,可以同时被多个协程使用 。
GC 的执行一般会使 Pool 中的对象全部移除。
那么 Pool 都适用于什么场景呢?从它的特点来说,适用与无状态的对象的复用,而不适用与如连接池之类的。在 fmt 包中有一个很好的使用池的例子,它维护一个动态大小的临时输出缓冲区。
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package mainimport ( "fmt" "sync" ) var pool *sync.Pooltype Person struct { Name string } func initPool () { pool = &sync.Pool { New: func () interface {} { fmt.Println("Creating a new Person" ) return new (Person) }, } } func main () { initPool() p := pool.Get().(*Person) fmt.Println("首次从 pool 里获取:" , p) p.Name = "first" fmt.Printf("设置 p.Name = %s\n" , p.Name) pool.Put(p) fmt.Println("Pool 里已有一个对象:&{first},调用 Get: " , pool.Get().(*Person)) fmt.Println("Pool 没有对象了,调用 Get: " , pool.Get().(*Person)) }
安全映射 sync.Map 在Go 1.6之前, 内置的map类型是部分goroutine安全的,并发的读没有问题,并发的写可能有问题。自go 1.6之后, 并发地读写map会报错,这在一些知名的开源库中都存在这个问题,所以go 1.9之前的解决方案是额外绑定一个锁,封装成一个新的struct或者单独使用锁都可以。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func main () { var counter = struct { sync.RWMutex m map [string ]int }{m: make (map [string ]int )} counter.RLock() n := counter.m["some_key" ] counter.RUnlock() fmt.Println("some_key:" , n) counter.Lock() counter.m["some_key" ]++ counter.Unlock() }
sync.Map
Store
LoadOrStore
Load
Delete
Range
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package main import ( "fmt" "sync" ) func main () { var m sync.Map m.Store(1 ,"a" ) m.Store(2 ,"b" ) v,ok := m.LoadOrStore("1" ,"aaa" ) fmt.Println(ok,v) v,ok = m.LoadOrStore(1 ,"aaa" ) fmt.Println(ok,v) v,ok = m.Load(1 ) if ok{ fmt.Println("it's an existing key,value is " ,v) } else { fmt.Println("it's an unknown key" ) } f := func (k, v interface {}) bool { fmt.Println(k,v) return true } m.Range(f) m.Delete(1 ) fmt.Println(m.Load(1 )) }
sync.atomic 功能:提供的原子操作
原子操作是比其它同步技术更基础的操作。原子操作是无锁的,常常直接通过CPU指令直接实现 。 事实上,其它同步技术的实现常常依赖于原子操作。
Go支持的原子操作概述 对于一个整数类型T
,sync/atomic
标准库包提供了下列原子操作函数。 其中T
可以是内置int32
、int64
、uint32
、uint64
和uintptr
类型。
1 2 3 4 5 func AddT (addr *T, delta T) (new T) func LoadT (addr *T) (val T) func StoreT (addr *T, val T) func SwapT (addr *T, new T) (old T) func CompareAndSwapT (addr *T, old, new T) (swapped bool )
比如,下列五个原子操作函数提供给了内置int32
类型。
1 2 3 4 5 func AddInt32 (addr *int32 , delta int32 ) (new int32 ) func LoadInt32 (addr *int32 ) (val int32 ) func StoreInt32 (addr *int32 , val int32 ) func SwapInt32 (addr *int32 , new int32 ) (old int32 ) func CompareAndSwapInt32 (addr *int32 , old, new int32 ) (swapped bool )
下列四个原子操作函数提供给了(安全)指针类型。因为Go目前(1.16)并不支持自定义泛型,所以这些函数是通过非类型安全指针 unsafe.Pointer
来实现的。
1 2 3 4 func LoadPointer (addr *unsafe.Pointer) (val unsafe.Pointer) func StorePointer (addr *unsafe.Pointer, val unsafe.Pointer) func SwapPointer (addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) func CompareAndSwapPointer (addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool )
因为Go指针不支持算术运算,所以相对于整数类型,指针类型的原子操作少了一个AddPointer
函数。
sync/atomic
标准库包也提供了一个Value
类型。以它为基的指针类型*Value
拥有两个方法:Load
和Store
。 Value
值用来原子读取和修改任何类型的Go值。
1 2 func (v *Value) Load () (x interface {}) func (v *Value) Store (x interface {})
下面部分将通过一些示例来展示如何使用这些原子操作函数。
整数原子操作 下面这个例子展示了如何使用add
原子操作来并发地递增一个int32
值。
在此例子中,主协程中创建了1000个新协程。每个新协程将整数n
的值增加1
。 原子操作保证这1000个新协程之间不会发生数据竞争。此程序肯定打印出1000
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package mainimport ( "fmt" "sync" "sync/atomic" ) func main () { var n int32 var wg sync.WaitGroup for i := 0 ; i < 1000 ; i++ { wg.Add(1 ) go func () { atomic.AddInt32(&n, 1 ) wg.Done() }() } wg.Wait() fmt.Println(atomic.LoadInt32(&n)) }
如果我们将新协程中的语句atomic.AddInt32(&n, 1)
替换为n++
,则最后的输出结果很可能不是1000
。
StoreT
和LoadT
原子操作函数经常被用来需要并发运行的实现setter和getter方法。下面是一个这样的例子:
1 2 3 4 5 6 7 8 9 10 11 type Page struct { views uint32 } func (page *Page) SetViews (n uint32 ) { atomic.StoreUint32(&page.views, n) } func (page *Page) Views () uint32 { return atomic.LoadUint32(&page.views) }
如果T
是一个有符号整数类型,比如int32
或int64
,则AddT
函数调用的第二个实参可以是一个负数,用来实现原子减法操作。
但是如果T
是一个无符号整数类型,比如uint32
、uint64
或者uintptr
,则AddT
函数调用的第二个实参需要为一个非负数,那么如何实现无符号整数类型T
值的原子减法操作呢? 毕竟sync/atomic
标准库包没有提供SubstractT
函数。 根据欲传递的第二个实参的特点,我们可以把T
为一个无符号整数类型的情况细分为两类:
第二个实参为类型为T
的一个变量值v
。 因为-v
在Go中是合法的,所以-v
可以直接被用做AddT
调用的第二个实参。
第二个实参为一个正整数常量c
,这时-c
在Go中是编译不通过的,所以它不能被用做AddT
调用的第二个实参。 这时我们可以使用^T(c-1)
(仍为一个正数)做为AddT
调用的第二个实参。
此^T(v-1)
小技巧对于无符号类型的变量v
也是适用的,但是^T(v-1)
比T(-v)
的效率要低。
对于这个^T(c-1)
小技巧,如果c
是一个类型确定值并且它的类型确实就是T
,则它的表示形式可以简化为^(c-1)
。
一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package mainimport ( "fmt" "sync/atomic" ) func main () { var ( n uint64 = 97 m uint64 = 1 k int = 2 ) const ( a = 3 b uint64 = 4 c uint32 = 5 d int = 6 ) show := fmt.Println atomic.AddUint64(&n, -m) show(n) atomic.AddUint64(&n, -uint64 (k)) show(n) atomic.AddUint64(&n, ^uint64 (a - 1 )) show(n) atomic.AddUint64(&n, ^(b - 1 )) show(n) atomic.AddUint64(&n, ^uint64 (c - 1 )) show(n) atomic.AddUint64(&n, ^uint64 (d - 1 )) show(n) x := b; atomic.AddUint64(&n, -x) show(n) atomic.AddUint64(&n, ^(m - 1 )) show(n) atomic.AddUint64(&n, ^uint64 (k - 1 )) show(n) }
SwapT
函数调用和StoreT
函数调用类似,但是返回修改之前的旧值(因此称为置换操作)。
一个CompareAndSwapT
函数调用仅在新值和旧值不相等的情况下才会执行修改操作,并返回true
;否则立即返回false
。
一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package mainimport ( "fmt" "sync/atomic" ) func main () { var n int64 = 123 var old = atomic.SwapInt64(&n, 789 ) fmt.Println(n, old) swapped := atomic.CompareAndSwapInt64(&n, 123 , 456 ) fmt.Println(swapped) fmt.Println(n) swapped = atomic.CompareAndSwapInt64(&n, 789 , 456 ) fmt.Println(swapped) fmt.Println(n) }
指针值的原子操作 上面已经提到了sync/atomic
标准库包为指针值的原子操作提供了四个函数,并且指针值的原子操作是通过非类型安全指针来实现的。
从非类型安全指针 一文,我们得知,在Go中, 任何指针类型的值可以被显式转换为非类型安全指针类型unsafe.Pointer
,反之亦然。 所以指针类型*unsafe.Pointer
的值也可以被显式转换为类型unsafe.Pointer
,反之亦然。
下面这个程序不是一个并发程序。它仅仅展示了如何使用指针原子操作。在这个例子中,类型T
可以为任何类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package mainimport ( "fmt" "sync/atomic" "unsafe" ) type T struct {x int }var pT *Tfunc main () { var unsafePPT = (*unsafe.Pointer)(unsafe.Pointer(&pT)) var ta, tb = T{1 }, T{2 } atomic.StorePointer(unsafePPT, unsafe.Pointer(&ta)) fmt.Println(pT) pa1 := (*T)(atomic.LoadPointer(unsafePPT)) fmt.Println(pa1 == &ta) pa2 := atomic.SwapPointer(unsafePPT, unsafe.Pointer(&tb)) fmt.Println((*T)(pa2) == &ta) fmt.Println(pT) b := atomic.CompareAndSwapPointer(unsafePPT, pa2, unsafe.Pointer(&tb)) fmt.Println(b) b = atomic.CompareAndSwapPointer(unsafePPT, unsafe.Pointer(&tb), pa2) fmt.Println(b) }
是的,目前指针的原子操作使用起来是相当的啰嗦。 事实上,啰嗦还是次要的,更主要的是,因为指针的原子操作需要引入unsafe
标准库包,所以这些操作函数不在Go 1兼容性保证 之列。
如果你确实担忧这些指针原子操作在未来的合法性,你可以使用下一节将要介绍的原子操作。 但是下一节将要介绍的原子操作对于指针值来说比本节介绍的指针原子操作效率要低得多。
任何类型值的原子操作 sync/atomic
标准库包中提供的Value
类型可以用来读取和修改任何类型的值。
类型*Value
有几个方法:Load
、Store
、Swap
和CompareAndSwap
(其中后两个方法实在Go 1.17中引入的)。 这些方法均以interface{}
做为参数类型,所以传递给它们的实参可以是任何类型的值。 但是对于一个可寻址的Value
类型的值v
,一旦v.Store
方法((&v).Store
的简写形式)被曾经调用一次,则传递给值v
的后续方法调用的实参的具体类型必须和传递给它的第一次调用的实参的具体类型一致; 否则,将产生一个恐慌。nil
接口类型实参也将导致v.Store()
方法调用产生恐慌。
一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package mainimport ( "fmt" "sync/atomic" ) func main () { type T struct {a, b, c int } var ta = T{1 , 2 , 3 } var v atomic.Value v.Store(ta) var tb = v.Load().(T) fmt.Println(tb) fmt.Println(ta == tb) v.Store("hello" ) }
另一个例子(针对Go 1.17+):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package mainimport ( "fmt" "sync/atomic" ) func main () { type T struct {a, b, c int } var x = T{1 , 2 , 3 } var y = T{4 , 5 , 6 } var z = T{7 , 8 , 9 } var v atomic.Value v.Store(x) fmt.Println(v) old := v.Swap(y) fmt.Println(v) fmt.Println(old.(T)) swapped := v.CompareAndSwap(x, z) fmt.Println(swapped, v) swapped = v.CompareAndSwap(y, z) fmt.Println(swapped, v) }
事实上,我们也可以使用上一节介绍的指针原子操作来对任何类型的值进行原子读取和修改,不过需要多一级指针的间接引用。 两种方法有各自的好处和缺点。在实践中需要根据具体需要选择合适的方法。