原子性的解释如下:

一个或者多个操作在 CPU 执行的过程中不被中断的特性,称为原子性(atomicity) 。这些操作对外表现成一个不可分割的整体,他们要么都执行,要么都不执行,外界不会看到他们只执行到一半的状态。

CPU执行一系列操作时不可能不发生中断,但如果我们在执行多个操作时,能让他们的中间状态对外不可见,那我们就可以宣称他们拥有了”不可分割”的原子性。

Go 语言提供了哪些原子操作

Go语言通过内置包sync/atomic提供了对原子操作的支持,其提供的原子操作有以下几大类:

  • 增减,操作方法的命名方式为AddXXXType,保证对操作数进行原子的增减,支持的类型为int32int64uint32uint64uintptr,使用时以实际类型替换前面我说的XXXType就是对应的操作方法。
  • 载入,保证了读取到操作数前没有其他任务对它进行变更,操作方法的命名方式为LoadXXXType,支持的类型除了基础类型外还支持Pointer,也就是支持载入任何类型的指针。
  • 存储,有载入了就必然有存储操作,这类操作的方法名以Store开头,支持的类型跟载入操作支持的那些一样。
  • 比较并交换,也就是CAS (Compare And Swap),像Go的很多并发原语实现就是依赖的CAS操作,同样是支持上面列的那些类型。
  • 交换,这个简单粗暴一些,不比较直接交换,这个操作很少会用。

互斥锁跟原子操作的区别

平日里,在并发编程里,Go语言sync包里的同步原语Mutex是我们经常用来保证并发安全的,那么他跟atomic包里的这些操作有啥区别呢?在我看来他们在使用目的和底层实现上都不一样:

  • 使用目的:互斥锁是用来保护一段逻辑,原子操作用于对一个变量的更新保护
  • 底层实现:Mutex操作系统的调度器实现,而atomic包中的原子操作则由底层硬件指令直接提供支持,这些指令在执行的过程中是不允许中断的,因此原子操作可以在lock-free的情况下保证并发安全,并且它的性能也能做到随CPU个数的增多而线性扩展。

对于一个变量更新的保护,原子操作通常会更有效率,并且更能利用计算机多核的优势。

比如下面这个,使用互斥锁的并发计数器程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func mutexAdd() {
var a int32 = 0
var wg sync.WaitGroup
var mu sync.Mutex
start := time.Now()
for i := 0; i < 100000000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
a += 1
mu.Unlock()
}()
}
wg.Wait()
timeSpends := time.Now().Sub(start).Nanoseconds()
fmt.Printf("use mutex a is %d, spend time: %v\n", a, timeSpends)
}

Mutex改成用方法atomic.AddInt32(&a, 1)调用,在不加锁的情况下仍然能确保对变量递增的并发安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func AtomicAdd() {
var a int32 = 0
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt32(&a, 1)
}()
}
wg.Wait()
timeSpends := time.Now().Sub(start).Nanoseconds()
fmt.Printf("use atomic a is %d, spend time: %v\n", atomic.LoadInt32(&a), timeSpends)
}

需要注意的是,所有原子操作方法的被操作数形参必须是指针类型,通过指针变量可以获取被操作数在内存中的地址,从而施加特殊的CPU指令,确保同一时间只有一个goroutine能够进行操作

比较并交换

该操作简称CAS (Compare And Swap)。 这类操作的前缀为 CompareAndSwap :

1
2
3
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

该操作在进行交换前首先确保被操作数的值未被更改,即仍然保存着参数 old 所记录的值,满足此前提条件下才进行交换操作CAS的做法类似操作数据库时常见的乐观锁机制。

需要注意的是,当有大量的goroutine 对变量进行读写操作时,可能导致CAS操作无法成功,这时可以利用for循环多次尝试。

上面我只列出了比较典型的int32unsafe.Pointer类型的CAS方法,主要是想说除了读数值类型进行比较交换,还支持对指针进行比较交换。

unsafe.Pointer提供了绕过Go语言指针类型限制的方法,unsafe指的并不是说不安全,而是说官方并不保证向后兼容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 定义一个struct类型P
type P struct{ x, y, z int }

// 执行类型P的指针
var pP *P

func main() {

// 定义一个执行unsafe.Pointer值的指针变量
var unsafe1 = (*unsafe.Pointer)(unsafe.Pointer(&pP))

// Old pointer
var sy P

// 为了演示效果先将unsafe1设置成Old Pointer
px := atomic.SwapPointer(
unsafe1, unsafe.Pointer(&sy))

// 执行CAS操作,交换成功,结果返回true
y := atomic.CompareAndSwapPointer(unsafe1, unsafe.Pointer(&sy), px)

fmt.Println(y)
}

上面的示例并不是在并发环境下进行的CAS,只是为了演示效果,先把被操作数设置成了Old Pointer

其实Mutex的底层实现也是依赖原子操作中的CAS实现的,原子操作的atomic包相当于是sync包里的那些同步原语的实现依赖。

比如互斥锁Mutex的结构里有一个state字段,其是表示锁状态的状态位。

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

为了方便理解,我们在这里将它的状态定义为0和1,0代表目前该锁空闲,1代表已被加锁,以下是sync.MutexLock方法的部分实现代码。

1
2
3
4
5
6
7
8
9
10
11
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}

atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)中,m.state代表锁的状态,通过CAS方法,判断锁此时的状态是否空闲(m.state==0),是,则对其加锁(mutexLocked常量的值为1)。

atomic.Value保证任意值的读写安全

atomic包里提供了一套Store开头的方法,用来保证各种类型变量的并发写安全,避免其他操作读到了修改变量过程中的脏数据。

1
2
3
4
5
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

...

这些操作方法的定义与上面介绍的那些操作的方法类似。

值得一提的是如果你想要并发安全的设置一个结构体的多个字段,除了把结构体转换为指针,通过StorePointer设置外,还可以使用atomic包后来引入的atomic.Value,它在底层为我们完成了从具体指针类型到unsafe.Pointer之间的转换。

有了atomic.Value后,它使得我们可以不依赖于不保证兼容性的unsafe.Pointer类型,同时又能将任意数据类型的读写操作封装成原子性操作(中间状态对外不可见)。

atomic.Value类型对外暴露了两个方法:

  • v.Store(c) - 写操作,将原始的变量c存放到一个atomic.Value类型的v里。
  • c := v.Load() - 读操作,从线程安全的v中读取上一步存放的内容。

1.17 版本我看还增加了SwapCompareAndSwap方法。

简洁的接口使得它的使用也很简单,只需将需要做并发保护的变量读取和赋值操作用Load()Store()代替就行了。

由于Load()返回的是一个interface{}类型,所以在使用前我们记得要先转换成具体类型的值,再使用。下面是一个简单的例子演示atomic.Value的用法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type Rectangle struct {
length int
width int
}

var rect atomic.Value

func update(width, length int) {
rectLocal := new(Rectangle)
rectLocal.width = width
rectLocal.length = length
rect.Store(rectLocal)
}

func main() {
wg := sync.WaitGroup{}
wg.Add(10)
// 10 个协程并发更新
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
update(i, i+5)
}()
}
wg.Wait()
_r := rect.Load().(*Rectangle)
fmt.Printf("rect.width=%d\nrect.length=%d\n", _r.width, _r.length)
}

你也可以试试,不用atomic.Value,直接给Rectange类型的指针变量赋值,看看在并发条件下,两个字段的值是不是能跟预期的一样变成10和15。