背景介绍 程序并发运行时,多个 Go 协程同时访问修改共享资源的代码,这些修改共享资源的代码称为临界区。多个 GO 协程同时访问临界区会出现竞态条件,将会发生不可预知的错误。常见的解决方法就是使用同步语句进行限制,是在任意时刻只允许一个go协程访问临界区,避免出现竞态条件。互斥锁就是同步语句的一种实现。
常见用法 Go语言中,互斥锁是在sync
包中,引入sync
包即可创建一个互斥锁变量。
加锁解锁的操作也比较简单。
需要注意的是,解锁操作是不可重入的。如果连着两次进行解锁。会抛出相应的异常。
1 throw("sync: unlock of unlocked mutex" )
在锁的使用过程中,还需要注意避免死锁。举例说明:
1 2 3 m.Lock() m.Lock() m.Unlock()
在第一步加锁完成后,第三步的解锁依赖第二步的加锁成功,而第二步的加锁依赖第三步的解锁成功,这就出现了死锁。Go在执行过程中,也会检测死锁,直接抛出fatal error: all goroutines are asleep - deadlock!
异常。
来看一个完整的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package main import ( "fmt" "sync" ) var x = 0 func increment (wg *sync.WaitGroup, m *sync.Mutex) { m.Lock() x = x + 1 m.Unlock() wg.Done() } func main () { var w sync.WaitGroup var m sync.Mutex for i := 0 ; i < 1000 ; i++ { w.Add(1 ) go increment(&w, &m) } w.Wait() fmt.Println("final value of x" , x) }
原理剖析 metux底层结构 互斥锁底层,就是一个Mutex
的结构体实现的。结构体内部两个字段stata
字段和sema
字段。stata
字段用于标识锁状态,用于记录加锁状态、饥饿模式以及等待锁队列数量等。sema
字段表示信号量,用于执行协程的唤醒和休眠。
1 2 3 4 type Mutex struct { state int32 sema uint32 }
metux状态 在上文有讲到标记锁状态的字段,下面我们详细看一下这个字段,该字段的字段为int32
类型的。 将该字段进行二进制拆开,二进制位的标记如下:
可以看到从低位起的第一个二进制位是标记锁状态,当前标志位下,0代表未加锁,1代表已加锁。第二个二进制位标记是否存在一个活跃在内存中等待锁资源的goroutinue(自旋状态)。 第三个二进制位标记锁模式(正常模式和饥饿模式)。该状态下文详细讲解。除去一位正负标记位,其他28位为存储等待队列中的goroutinue数量
1 2 3 4 5 6 7 8 const ( mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 )
互斥锁模式 互斥锁有以下两种模式:
正常模式:所有等待锁的goroutine在一个先进先出的队列中,当锁释放的时候,如果内存中没有自旋中的goroutinue,会唤醒队首等待锁的goroutinue进行抢锁。如果抢锁失败,插入等待者队列头部等待下次唤醒。这里有一个问题就是在抢占锁比较频繁的场景下,内存中一直存在自旋状态下的goroutinue,这样就会存在队列中goroutinue一直没有得到调度,存在饿死的情况。于是,为了防止这种情况,就出现了饥饿模式。
饥饿模式:饥饿模式触发条件队首的goroutinue被唤醒是,检测到等待时长超过1ms,将锁模式切换为饥饿模式。在饥饿模式下,在锁释放的goroutinue内部,直接将锁移交给等待队列中队首的goroutinue,然后在进行挂载唤醒,这样就避免出现饿死的情况。另外,队首的goroutinue往往是等待时间最长的goutinue,后面如果检测到等待时长小于1ms,切换为正常模式。
什么情况下会出现处于自旋态的Goroutinue?
首先说一下什么是自旋,当协程尝试抢占资源时,如果资源被其他协程占用,当前协程将循环等待获取资源,即此时协程处于自旋态。当前goroutinue尝试获取锁时,如果此时锁处于锁定状态,当前goroutinue不会立马加入到锁等待队列中,而且是将当前goroutinue自旋一段时间,在这段时间内,如果锁释放,当前goroutinue可以很快的响应,拿到锁。
协程自旋和协程挂起唤醒有什么本质不同?
协程自旋:协程是一直处于运行状态,不释放CPU资源,当等待的资源就位时避免唤醒操作,响应及时。 协程挂起唤醒:协程挂起释放CUP资源,当等待的资源就位时执行唤醒操作,CPU执行效率高,但响应不及时。
总体来说,自旋和挂起唤醒各有个的优点。
自旋可以尽可能的减少协/线程的阻塞,这对于锁竞争不激烈且占用锁时间非常短 的代码块来说性能能大幅度的提升,因为自旋的消耗会小于协/线程阻塞挂起再唤醒的操作的消耗。
挂起唤醒可以提高CPU的利用率,对于锁竞争激烈或占用锁时间较长 的代码来说,自旋锁在获取锁前一直都是占用cpu做无用功,同时有大量协/线程在竞争一个锁,会导致获取锁的时间很长,协/线程自旋的消耗大于协/线程阻塞挂起操作的消耗。
思考一下GO为什么要先尝试自旋获取,失败后在进行挂起唤醒操作?
Go将自旋和挂起唤醒的优点结合在一块了,正常锁竞争不激烈的情况下,锁自旋这段时间就可以等到锁,避免了挂起唤醒。而锁竞争激烈或其他协程占用锁时间较长的情况下,将当前协程挂起,释放CPU资源,提高CPU的利用率。
加锁逻辑 在详细的代码中,加锁逻辑比较复杂难以理解。但是一一拆分后,我们其实很容易就可以明白。
主要步骤如下:
尝试简易加锁,如果成功直接返回。简易加锁逻辑cas(state, 0, 1)
尝试进行自旋以及awoken标记位置位。如果锁状态处于锁定状态且非饥饿,标记唤醒状态位, 进行自旋操作。
有两种情况。
a.锁已被释放,由于awoken的标记,不会去唤醒其他goroutinue强锁,进行加锁标记位置位,加锁成功,函数返回。 b.锁未释放,等待锁数量加1,将当前goroutinue休眠,等待被唤醒。
当前goroutinue唤醒后,也会存在两种情况。
a.当前锁状态为饥饿模式,即锁已经移交给当前goroutinue了,判断最新的饥饿状态,更新饥饿状态以及等待数量值减一,加锁成功,函数返回。 b.当前锁未处于饥饿模式,回溯到步骤2。
相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0 , mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new &mutexWoken == 0 { throw("sync: inconsistent mutex state" ) } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new ) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state" ) } delta := int32 (mutexLocked - 1 <<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } } func sync_runtime_SemacquireMutex (addr *uint32 , lifo bool ) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile) } func semacquire1 (addr *uint32 , lifo bool , profile semaProfileFlags) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack" ) } s := acquireSudog() root := semroot(addr) t0 := int64 (0 ) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } s.acquiretime = t0 } for { lock(&root.lock) atomic.Xadd(&root.nwait, 1 ) if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1 ) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, "semacquire" , traceEvGoBlockSync, 4 ) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3 ) } releaseSudog(s) }
解锁逻辑 解锁逻辑相对于加锁来说是比较简单的。主要步骤如下:
锁标记位置为0
如果当前处于等待队列数量不为0且非饥饿非锁定状态且不存在自旋中的goroutinue,唤醒队首处于休眠状态的goroutinue
如果处于饥饿状态,执行锁的移交操作,这里是依赖信号量的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) if (new +mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex" ) } if new &mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1 <<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new ) { runtime_Semrelease(&m.sema, false ) return } old = m.state } } else { runtime_Semrelease(&m.sema, true ) } } func sync_runtime_Semrelease (addr *uint32 , handoff bool ) { semrelease1(addr, handoff) } func semrelease1 (addr *uint32 , handoff bool ) { root := semroot(addr) atomic.Xadd(addr, 1 ) if atomic.Load(&root.nwait) == 0 { return } lock(&root.lock) if atomic.Load(&root.nwait) == 0 { unlock(&root.lock) return } s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1 ) } unlock(&root.lock) if s != nil { acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3 ) } if s.ticket != 0 { throw("corrupted semaphore ticket" ) } if handoff && cansemacquire(addr) { s.ticket = 1 } readyWithTime(s, 5 ) } }
进阶理解
程序优化时,在原子性自增与互斥锁之间,可以不用考虑原子性自增,直接使用互斥锁了,互斥锁的底层实现就是原子性自增实现的。
在读多写少的场景下,建议使用读写锁sync.RWMetux
。
在临界区入口加锁完成后,直接进行defer解锁操作。