背景介绍

使用互斥锁,可以避免多个 Go 协程同时访问修改共享资源的代码。但是对于多个协程同时读的操作,没有修改共享资源,不需要添加互斥锁,互斥锁在这种场景下,会大幅度降低程序性能。

在这种场景下,我们真正需要具有读并发、互斥写、读写互斥等特点的逻辑,读写锁是其中的一种实现。

常见用法

声明一个读写锁

1
var m *sync.RWMutex

读加锁和读解锁

1
2
m.RLock()
m.RUnlock()

写加锁和写解锁

1
2
m.Lock()
m.Unlock()

读写锁拥有以下特点:

  1. 并发读;可以同时加多个读锁
  2. 互斥写;写逻辑互斥,在任何一个时间点都一个协程获取到写锁。
  3. 读写互斥;当写锁加锁成功时,获取读锁的协程休眠,等待写锁释放。 当存在读加锁时,获取写锁的协程休眠,等待所有读锁释放。

来看一个完整的case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"sync"
"time"
)

var m *sync.RWMutex

func main() {
m = new(sync.RWMutex)

go read(0)
go write(1)
go read(2)

time.Sleep(2*time.Second)
}

func read(i int) {
println(i,"read start")

m.RLock()
println(i,"reading")
time.Sleep(1*time.Second)
m.RUnlock()

println(i,"read over")
}

func write(i int) {
println(i,"write start")

m.Lock()
println(i,"writing")
time.Sleep(1*time.Second)
m.Unlock()

println(i,"write over")
}

原理剖析

整体结构

互斥源码所在位置:go/src/sync/rwmutex.go, RWMutex结构如下:

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 互斥锁
writerSem uint32 // 写锁信号
readerSem uint32 // 读锁信号(当写锁加上时,所有获取读锁的goutinue都会监听该信号,进行休眠等待。 写锁释放时,唤醒所有监听该信号的goroutinue)
readerCount int32 // 获取到读锁的数量
readerWait int32 // 等待释放读锁的数量
}

写锁

写锁加锁

写锁加锁逻辑:

  1. 获取读写锁的读锁数量,将读锁数量置为一个负数,准确值是:readerCount-rwmutexMaxReaders

  2. 如果读锁数量不为0,更新readerWait字段,将当前协程(获取写锁的线程)进行休眠,监听writerSem信号,等待所有读锁数量释放。

在读锁解锁逻辑里面,我们可以看到readerWait为0时,发出writerSem信号,唤醒获取写锁的协程。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rw *RWMutex) Lock() {
if race.Enabled { // 竞态分析,忽略
_ = rw.w.state
race.Disable()
}
rw.w.Lock() // 真实上锁,如果这里获取不到,在这里就会休眠。(见metux锁实现)
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 获取读锁数量,且将读锁数量置为一个负数,这里为什么要置为一个负数?见后续

if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // 如果读锁数量不为0, 将读锁数量累加到readerWait待释放读锁数量中,进行休眠(等待所有读锁数量释放,再进行后续操作)
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled { // 竞态分析,忽略
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

写锁解锁

写锁解锁逻辑:

  1. 将读锁数量恢复正常,即负数->正数唤醒
  2. 发出readerSem信号,唤醒所有监听该信号的协程
  3. 执行解锁逻辑

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (rw *RWMutex) Unlock() {
if race.Enabled { // 竞态分析,忽略
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}

// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // 将读锁数量加上rwmutexMaxReaders,恢复正常数量,获取读锁数量

if r >= rwmutexMaxReaders { // 读锁加锁数量如果大于最大读数量,抛出异常
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}

for i := 0; i < int(r); i++ { // 唤醒所有等待读锁的goroutinue(一旦写锁加上后,所有获取读锁的操作都会休眠)
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock() // 真实解锁,这里会唤醒其他等待写锁的goroutinue(见metux锁实现)
if race.Enabled { // 竞态分析,忽略
race.Enable()
}
}

读锁

读锁加锁

读锁加锁逻辑:

  1. 将读锁数量加一
  2. 如果读锁数量为负数时,即存在获取写锁的操作,将当前goroutinue休眠,监听readerSem信号,等待写锁释放时唤醒

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (rw *RWMutex) RLock() {
if race.Enabled { // 竞态分析,忽略
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 { // 读锁数量+1,如果读锁数量为负(某个routinue获取写锁,后面所有读锁只能休眠,等待写锁的完成),休眠当前gouroutinue
runtime_SemacquireMutex(&rw.readerSem, false, 0) // 这里readerSem 数量为自增1
}
if race.Enabled { // 竞态分析,忽略
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}

读锁解锁

读锁写锁逻辑:

  1. 将读锁数量减一
  2. 读锁数量为负数,即当前存在获取写锁的goroutinue逻辑,将readerWait减一。
  3. readerWait == 0 即:获取所有人都释放读锁,发出writerSem信号,唤醒获取写锁的goroutinue。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rw *RWMutex) RUnlock() {
if race.Enabled { // 竞态分析,忽略
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // 读锁数量-1;小于0情况有两种,1.为加读锁,直接解锁。2. 某个线程将要获取写锁
rw.rUnlockSlow(r)
}
if race.Enabled { // 竞态分析,忽略
race.Enable()
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders { // 如果r+1 == 0, 即r == -1, 未加读锁,直接解锁。 异常
race.Enable() // 如果r+1 == -rwmutexMaxReaders, 即r == -(rwmutexMaxReaders+1),这里就算是某个goroutinue将要获取写锁,将readerCount 置为 readerCount-rwmutexMaxReaders,这个数字也不会小于-rwmutexMaxReaders 异常
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 某个goroutinue将要获取写锁,这里如果readerWait == 0,即 所有读锁全部解锁了,唤醒获取写锁的goroutinue
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}