背景介绍

程序并发运行时,多个 Go 协程同时访问修改共享资源的代码,这些修改共享资源的代码称为临界区。多个 GO 协程同时访问临界区会出现竞态条件,将会发生不可预知的错误。常见的解决方法就是使用同步语句进行限制,是在任意时刻只允许一个go协程访问临界区,避免出现竞态条件。互斥锁就是同步语句的一种实现。

常见用法

Go语言中,互斥锁是在sync包中,引入sync包即可创建一个互斥锁变量。

1
var m sync.Mutex

加锁解锁的操作也比较简单。

1
2
m.Lock()   // 加锁
m.Unlock() // 解锁

需要注意的是,解锁操作是不可重入的。如果连着两次进行解锁。会抛出相应的异常。

1
throw("sync: unlock of unlocked mutex")

在锁的使用过程中,还需要注意避免死锁。举例说明:

1
2
3
m.Lock()
m.Lock()
m.Unlock()

在第一步加锁完成后,第三步的解锁依赖第二步的加锁成功,而第二步的加锁依赖第三步的解锁成功,这就出现了死锁。Go在执行过程中,也会检测死锁,直接抛出fatal error: all goroutines are asleep - deadlock!异常。

来看一个完整的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main  
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
x = x + 1
m.Unlock()
wg.Done()
}
func main() {
var w sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, &m)
}
w.Wait()
fmt.Println("final value of x", x)
}

原理剖析

metux底层结构

互斥锁底层,就是一个Mutex的结构体实现的。结构体内部两个字段stata字段和sema字段。stata字段用于标识锁状态,用于记录加锁状态、饥饿模式以及等待锁队列数量等。sema字段表示信号量,用于执行协程的唤醒和休眠。

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

metux状态

在上文有讲到标记锁状态的字段,下面我们详细看一下这个字段,该字段的字段为int32类型的。 将该字段进行二进制拆开,二进制位的标记如下:

可以看到从低位起的第一个二进制位是标记锁状态,当前标志位下,0代表未加锁,1代表已加锁。第二个二进制位标记是否存在一个活跃在内存中等待锁资源的goroutinue(自旋状态)。 第三个二进制位标记锁模式(正常模式和饥饿模式)。该状态下文详细讲解。除去一位正负标记位,其他28位为存储等待队列中的goroutinue数量

1
2
3
4
5
6
7
8
const (
mutexLocked = 1 << iota // 处于锁状态
mutexWoken // 唤醒状态
mutexStarving // 锁处于饥饿模式
mutexWaiterShift = iota // 等待锁的队列中的元素数量 state >> mutexWaiterShift 即为元素数量

starvationThresholdNs = 1e6 // 1ms = 1000 微秒 = 1000*1000 纳秒
)

互斥锁模式

互斥锁有以下两种模式:

  • 正常模式:所有等待锁的goroutine在一个先进先出的队列中,当锁释放的时候,如果内存中没有自旋中的goroutinue,会唤醒队首等待锁的goroutinue进行抢锁。如果抢锁失败,插入等待者队列头部等待下次唤醒。这里有一个问题就是在抢占锁比较频繁的场景下,内存中一直存在自旋状态下的goroutinue,这样就会存在队列中goroutinue一直没有得到调度,存在饿死的情况。于是,为了防止这种情况,就出现了饥饿模式。

  • 饥饿模式:饥饿模式触发条件队首的goroutinue被唤醒是,检测到等待时长超过1ms,将锁模式切换为饥饿模式。在饥饿模式下,在锁释放的goroutinue内部,直接将锁移交给等待队列中队首的goroutinue,然后在进行挂载唤醒,这样就避免出现饿死的情况。另外,队首的goroutinue往往是等待时间最长的goutinue,后面如果检测到等待时长小于1ms,切换为正常模式。

什么情况下会出现处于自旋态的Goroutinue?

首先说一下什么是自旋,当协程尝试抢占资源时,如果资源被其他协程占用,当前协程将循环等待获取资源,即此时协程处于自旋态。当前goroutinue尝试获取锁时,如果此时锁处于锁定状态,当前goroutinue不会立马加入到锁等待队列中,而且是将当前goroutinue自旋一段时间,在这段时间内,如果锁释放,当前goroutinue可以很快的响应,拿到锁。

协程自旋和协程挂起唤醒有什么本质不同?

协程自旋:协程是一直处于运行状态,不释放CPU资源,当等待的资源就位时避免唤醒操作,响应及时。
协程挂起唤醒:协程挂起释放CUP资源,当等待的资源就位时执行唤醒操作,CPU执行效率高,但响应不及时。

总体来说,自旋和挂起唤醒各有个的优点。

自旋可以尽可能的减少协/线程的阻塞,这对于锁竞争不激烈且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于协/线程阻塞挂起再唤醒的操作的消耗。

挂起唤醒可以提高CPU的利用率,对于锁竞争激烈或占用锁时间较长的代码来说,自旋锁在获取锁前一直都是占用cpu做无用功,同时有大量协/线程在竞争一个锁,会导致获取锁的时间很长,协/线程自旋的消耗大于协/线程阻塞挂起操作的消耗。

思考一下GO为什么要先尝试自旋获取,失败后在进行挂起唤醒操作?

Go将自旋和挂起唤醒的优点结合在一块了,正常锁竞争不激烈的情况下,锁自旋这段时间就可以等到锁,避免了挂起唤醒。而锁竞争激烈或其他协程占用锁时间较长的情况下,将当前协程挂起,释放CPU资源,提高CPU的利用率。

加锁逻辑

在详细的代码中,加锁逻辑比较复杂难以理解。但是一一拆分后,我们其实很容易就可以明白。

主要步骤如下:

  1. 尝试简易加锁,如果成功直接返回。简易加锁逻辑cas(state, 0, 1)

  2. 尝试进行自旋以及awoken标记位置位。如果锁状态处于锁定状态且非饥饿,标记唤醒状态位, 进行自旋操作。

  3. 有两种情况。

    a.锁已被释放,由于awoken的标记,不会去唤醒其他goroutinue强锁,进行加锁标记位置位,加锁成功,函数返回。
    b.锁未释放,等待锁数量加1,将当前goroutinue休眠,等待被唤醒。

  4. 当前goroutinue唤醒后,也会存在两种情况。

    a.当前锁状态为饥饿模式,即锁已经移交给当前goroutinue了,判断最新的饥饿状态,更新饥饿状态以及等待数量值减一,加锁成功,函数返回。
    b.当前锁未处于饥饿模式,回溯到步骤2。

相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 快速获取锁
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}

var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// old&(mutexLocked|mutexStarving) == mutexLocked 锁定状态且非饥饿模式
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 等待中的gotinue 自旋状态
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 自旋过程中,如果不是处于唤醒状态且等待锁队列不为空,将锁状态置为唤醒状态
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}

new := old
if old&mutexStarving == 0 { // 饥饿状态下不进行抢锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 锁定状态,或者饥饿模式下,等待者计数+1
new += 1 << mutexWaiterShift
}

// starving = true的情况,肯定是被唤醒的情况。
// 被唤醒的情况下,设置饥饿模式为什么要判断 old&mutexLocked != 0呢?
// 原因是old&mutexLocked == 0的情况下,再经过后面的cas是有可能直接拿到锁的,这样的情况下就不设置饥饿模式了。如果后面cas 拿锁失败,在进行一遍循环,还是会设置饥饿模式。

// 只有被唤醒,已经饥饿但锁还被其他人获取了,才设置饥饿模式。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // awoken标记位置为空
}
if atomic.CompareAndSwapInt32(&m.state, old, new) { // 旧状态在此期间未改变的情况下,将新的状态更新到m.state

if old&(mutexLocked|mutexStarving) == 0 { // 如果旧状态未持有锁且处于非饥饿模式,执行到这里就已经获取到锁了。
break // locked the mutex with CAS
}

queueLifo := waitStartTime != 0 // 如果这里waitStartTime不等于0, 可以证明当前goroutinue是队首唤醒的goroutinue
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}

// 加入等待者队列,queueLifo=true放队首,否则反之。如果是队首唤醒的goroutinue的就放在队首。
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 调度回来后,判断是否达到饥饿状态
old = m.state
if old&mutexStarving != 0 { // 被唤醒时饥饿模式(已经拿到锁了)。1. 等待者计数减1, 2. 判断是否仍处于饥饿状态,更新最新的饥饿状态
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 { // 最新非饥饿模式,或者旧的等待者计数为1(只剩自己,无其他等待者)
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta) // 更新状态
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
s := acquireSudog() // 获取一个新的goroutinue
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lock(&root.lock) // 上锁
atomic.Xadd(&root.nwait, 1) // 等待者计数+1
if cansemacquire(addr) { // 简易模式,存在锁资源,直接获取锁资源 返回
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
root.queue(addr, s, lifo) // 加入等待者队列
goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4) // gopark阻塞,待调度
if s.ticket != 0 || cansemacquire(addr) { // 持有钥匙,或者获取到锁资源,直接break
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3)
}
releaseSudog(s)
}

解锁逻辑

解锁逻辑相对于加锁来说是比较简单的。主要步骤如下:

  1. 锁标记位置为0
  2. 如果当前处于等待队列数量不为0且非饥饿非锁定状态且不存在自旋中的goroutinue,唤醒队首处于休眠状态的goroutinue
  3. 如果处于饥饿状态,执行锁的移交操作,这里是依赖信号量的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (m *Mutex) Unlock() {

new := atomic.AddInt32(&m.state, -mutexLocked) // 更新标记位(这时候,其他自旋状态下的goroutinue可以抢锁了)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 等待着数量为0,或者当前锁状态处于 被锁(其他goutinue抢锁),唤醒状态(已经唤醒其他gourinue 或 处于自旋状态下将mutexWoken置位了),饥饿(饥饿模式下,不走唤醒操作) 直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken // 等待者计数-1,唤醒标记为置1
if atomic.CompareAndSwapInt32(&m.state, old, new) { // 更新状态
runtime_Semrelease(&m.sema, false) // 执行唤醒操作
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true) // 饥饿模式,在当前goutinue直接将锁资源给待唤醒的goutinue
}
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool) {
semrelease1(addr, handoff)
}

func semrelease1(addr *uint32, handoff bool) {
root := semroot(addr)
atomic.Xadd(addr, 1) // 资源为mutex.sema 自增

if atomic.Load(&root.nwait) == 0 { // 等待计数为0,返回
return
}

lock(&root.lock)
if atomic.Load(&root.nwait) == 0 { // 再次判断计数
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr) // 取出队首goroutinue
if s != nil {
atomic.Xadd(&root.nwait, -1) // 等待计数-1
}
unlock(&root.lock)
if s != nil { // May be slow, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
// handoff 标记是否是饥饿模式,饥饿模式下待唤醒的goutinue直接拿到钥匙(唤醒后不与内存中自旋的goutinue竞争)
// cansemacquire函数 是用来获取到锁资源,这时候其他goroutinue拿不到锁
// 这里其实饥饿模式下,在goroutinue未唤醒的情况下,就已经将锁资源给了待唤醒的goutinue
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5) // 执行唤醒操作
}
}

进阶理解

  1. 程序优化时,在原子性自增与互斥锁之间,可以不用考虑原子性自增,直接使用互斥锁了,互斥锁的底层实现就是原子性自增实现的。
  2. 在读多写少的场景下,建议使用读写锁sync.RWMetux
  3. 在临界区入口加锁完成后,直接进行defer解锁操作。