背景介绍

为什么需要sync.Map?

在Go 1.6之前, 内置map类型是部分goroutine安全,并发读没有问题,并发写可能有问题。

Go1.6之后, 并发读写map会报错,推荐用户直接用读写锁对map进行保护,也有第三方类库使用分段锁。

在1.9版本中,基于原本的map,新增了一个支持并发操作的map,叫sync map。

使用方法

Go 1.9之前解决并发读写问题方案

通过在结构体中加入读写锁,并且在读写之前先获取锁,使用完成之后释放锁

1
2
3
4
5
6
7
8
9
10
11
12
syM := struct{
sync.RWMutex
m map[string]int
}{m: make(map[string]int)}
//读数据的时候很方便的加锁:
syM.RLock()
n := syM.m["key1"]
syM.RUnlock()
//写数据的时候:
syM.Lock()
syM.m["key1"]++
syM.Unlock()

sync.Map使用方法

开箱即用

1
var sm sync.Map

添加元素 store

1
sm.Store(1,"a")

读取元素 load

1
2
3
if v,ok:=sm.Load(1);ok {
fmt.Println(v) // a
}

获取或者保存 LoadOrStore

参数是一对key:value,如果该key存在且没有被标记删除则返回原先的value(不更新)和true;不存在则store,返回该value 和false

1
2
3
4
5
6
if v,ok:=sm.LoadOrStore(1,"c");ok{
fmt.Println(v) // a
}
if v,ok:=sm.LoadOrStore(2,"c");!ok{
fmt.Println(v) // c
}

遍历map

遍历该map,参数是个函数,该函数参的两个参数是遍历获得的key和value,返回一个bool值,当返回false时,遍历立刻结束。

1
2
3
4
5
6
7
8
9
10
sm.Range(func(k,v interface{})bool{
fmt.Print(k)
fmt.Print(":")
fmt.Print(v)
fmt.Println()
return true
})
//运行结果:
//1:a
//2:c

sync.map内部使用了map

之前我们在讲解map是说过map本身是指针,因此当把sync.map当做函数参数传递,并且在函数内部就行修改,会影响到原sync.map。

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
var sm sync.Map
sm.Store(1,"a")
b := sm
fmt.Printf("%p\n",b)
changerMap(b)
fmt.Printf("%p\n",b)
}

func changerMap(b sync.Map) {
b.LoadOrStore(1,"b")
}

那么如何复制一个map?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 简单实现
func MapCopy(sm sync.Map) sync.Map {
var newMp sync.Map
sm.Range(func(k,v interface{})bool{
newMp.Store(k,v)
return true
})

return newMp
}

// 通用实现
import "encoding/gob"
func DeepCopy(dst, src interface{}) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(src); err != nil {
return err
}
return gob.NewDecoder(bytes.NewBuffer(buf.Bytes())).Decode(dst)
}

原理剖析

sync.map的总体结构

总结构如图所示:

sync.Map的原理其实是用空间换时间的策略,通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。

通过引入两个map将读写分离到不同的map,其中read map提供并发读和已存元素原子写,而dirty map则负责读写。

这样read map就可以在不加锁的情况下进行并发读取,当read map中没有读取到值时,再加锁进行后续读取,并累加未命中数,当未命中数大于等于dirty map长度,将dirty map上升为read map。

从结构体的定义可以发现,虽然引入了两个map,但是底层数据存储的是指针,指向的是同一份值。

sync.map结构如下:

1
2
3
4
5
6
type Map struct {
mu Mutex // 加锁作用,保护 dirty 字段
read atomic.Value // 只读的数据,实际数据类型为 readOnly
dirty map[interface{}]*entry // 最新写入的数据
misses int // 计数器,每次需要读 dirty 则 +1
}

read map 和 dirty map 的存储方式是不一致的——前者使用 atomic.Value,后者只是单纯的使用 map。

其实,这样的原因是 read map 是一个给 lock free 操作使用的数据结构,必须保证 load/store 的原子性,而 dirty map 的 load+store 操作是由 lock (就是 mu)来保护的。

lock free:无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步

read的数据结构如下:

1
2
3
4
type readOnly struct {
m map[interface{}]*entry // 内建 map
amended bool // 表示 dirty 里存在 read 里没有的 key,通过该字段决定是否加锁读 dirty
}

read map与 dirty map在读写环节的转换

在读环节,先使用read map,再使用dirty map。其中读环节可能会触发dirty map到read map的升级(将dirty map内容给到read map read map 置空)。

在修改环节,先使用read map,再使用dirty map。其中当dirty map不存在此key时,追加此key。

在新增环节,使用read map,当dirty map 为空时,会触发read map迁移到dirty map

entry的数据结构:

1
2
3
4
type entry struct {
//可见value是个指针类型,虽然read和dirty存在冗余情况(amended=false),但是由于是指针类型,存储的空间不会浪费
p unsafe.Pointer // 等同于 *interface{}
}

这里的 entry 其实就是正常 map 中的 value,但是为什么这里使用一个 entry 结构来做一层 wrapper 呢?

其实,这是因为在对某个 key 做删除的时候,会由于内建的map不是thread-safe的,所以无法达到lock free,然而通过这个entry我们可以添加一些状态来回避这个问题,从注释中可以看到 entry 有三个状态:

1.p == nil: 键值已经被删除,且 m.dirty == nil(这样我们不需要执行 non-thread-safe 的 delete(m, key) 操作)

2.p == expunged: 键值已经被删除,但 m.dirty!=nil 且 m.dirty 不存在该键值(expunged 实际是空接口指针)

3.除以上情况,则键值对存在,存在于 m.read.m 中,如果 m.dirty!=nil 则也存在于 m.dirty

为了方便理解expunged,我们可以可以看看read map与dirty map存储:

从上图中可以看出,read map 和 dirty map 中含有相同的一部分 entry,我们称作是 normal entries,是双方共享的,并且其中的 entry.p 只会是两种状态:nil、unexpunged(未删除的)

但是 read map 中含有一部分 entry 是不属于 dirty map 的,而这部分 entry 就是状态为 expunged

而 dirty map 中有一部分 entry 也是不属于 read map 的,而这部分其实是来自 Store 操作形成的(也就是新增的 entry),换句话说就是新增的 entry 是出现在 dirty map 中的。

map的加载

首先我们先看一个大概的流程图:

首先是从readonly里面读,读不到时候才加锁去 map.dirty 里面读,并且加锁之后首先是进行double check。

double check 之后即使不存在于m.read中,经过miss几次之后,m.dirty会被提升为m.read,又会从m.read中查找。所以对于更新/增加较少,加载存在的key很多的case,性能基本和无锁的map类似。

missLocked方法中可能会将m.dirty提升,m,misses会记录从readOnly中获取不到 *entry 的次数,也就是miss的次数,如果达到了 len(m.dirty) 就会原子的替换m.read.m 为 m.dirty。提升后m.dirty、m.misses重置, 并且m.read.amended为false。

下面我们详细看源码部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 1.首先从m.read中加载只读的readOnly, 从它的map中查找,无锁。
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 2. 如果没找到,并且m.dirty中有新数据,需要从m.dirty查找,这个时候需要加锁
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly) // double check
e, ok = read.m[key]
if !ok && read.amended {
// 从m.dirty查找
e, ok = m.dirty[key]
// 不管m.dirty中存不存在,都将misses计数加一
// missLocked()中满足条件后就会提升m.dirty
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// 原子加载 *entry 所保存的value。
return e.load()
}

func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

map的存储

首先我们先看一个大概的流程图:

整体思路就是:

1.如果从 read map 中能够找到 normal entry 的话,那么就直接 update 这个 entry 就行(lock free

2.否则,就上锁,对 dirty map 进行相关操作

其中2步骤分三种情况

2.1 double check流程:需要重新 check 一下 read map 中的内容,如果发现仍然是 expunged 的,那么会将 expunged 标记为 nil,并且在 dirty map 里面添加相应 key(这里其实就是将这个 entry 从一个 expunged 的 entry 变成了 normal entry)

2.2 dirty map处理流程:如果发现这个 key 并不属于 read,但属于 dirty 的时候,直接更新相应的值即可。

2.3 全新数据流程:在这种情况下,我们需要同时修改 read map 和 dirty map:如果 read map 没有被修改过(read.ameded==false),则意味着我们需要初始化 dirty map(read map 没有修改过表明 dirty map 有可能还未被使用,可能是 nil)。

下面我们详细看源码部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (m *Map) Store(key, value interface{}) {
// 1. 如果m.read存在这个键,并且这个entry没有被标记删除(expunged),那么cas自旋更新value。
// 因为m.dirty也指向这个entry,所以m.dirty也保持最新的entry。
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
// 2. m.read不存在或者已经被标记删除
m.mu.Lock()
// double check
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() { //标记成未被删除
//m.dirty中不存在这个键,所以加入m.dirty
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok { // m.dirty存在这个键,更新
e.storeLocked(&value)
} else { //新键值
if !read.amended { //m.dirty中没有比m.readOnly更新的数据,往m.dirty中增加第一个新键
// 从m.read中复制未删除的数据
// 并标记m.read已经落后于m.dirty
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
//将这个entry加入到m.dirty中
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// cas自旋更新value
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged { // 没有加锁,因此存在被删除的可能
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
// 从m.read中复制未删除的数据
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() { // 注意此处是cas操作,不是已经拿到锁了?原因是read map与dirty map的entry使用指针,下文delete时可能不加锁删除
m.dirty[k] = e
}
}
}

map的加载或存储

如果对应的元素存在,则返回该元素的值,如果不存在,则将元素写入到sync.Map。如果已加载值,则加载结果为true;如果已存储,则为false。

单数据不在read map 存在时,此处story过程和上述story一样。

下面我们详细看源码部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// 不加锁的情况下读取read map
// 第一次检测
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// 如果元素存在(是否标识为删除由tryLoadOrStore执行处理),尝试获取该元素已存在的值或者将元素写入
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}

m.mu.Lock()
// 第二次检测
// 以下逻辑和Store相同
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()

return actual, loaded
}
// 如果没有删除元素,tryLoadOrStore将自动加载或存储一个值。如果删除元素,tryLoadOrStore保持条目不变并返回ok= false。
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
// 元素标识删除,直接返回
if p == expunged {
return nil, false, false
}
// 存在该元素真实值,则直接返回原来的元素值
if p != nil {
return *(*interface{})(p), true, true
}

// 新更新元素值
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}

map的删除

首先我们先看一个大概的流程图:

这里会删除 m.dirty 对应的key-value, 但是m.read中的key-value其实并没有删除,只是设置了删除的标志为expunged。这里的惰性删除避免了重新创建 entry 实体,只用更新指针和value指针。

下面我们详细看源码部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (m *Map) Delete(key interface{}) {
// 1. 如果不存在于 m.read中,而且 m.dirty 和 m.read 数据不一致。
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
// 加锁,double check, 然后删除对应的key。
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
// 这里会删除 m.dirty 对应的key-value,
// 但是m.read中的key-value其实并没有删除,只是设置了删除的标志为expunged。
// 这里的惰性删除避免了重新创建 entry 实体,只用更新指针和value指针。
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}

map的遍历

sync.map无法使用系统提供的 for range 操作。这里采用了一个回调的操作:

下面我们详细看源码部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Map) Range(f func(key, value interface{}) bool) {
// 如果m.dirty中有新数据,则提升m.dirty,然后再遍历
read, _ := m.read.Load().(readOnly)
if read.amended {
//提升m.dirty
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}

// 遍历, for range是安全的
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}

进阶理解

优势

1.空间换时间。 通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。

2.使用只读数据(read),避免读写冲突。

3.动态调整,miss次数多了之后,将dirty数据提升为read。

4.double-checking。

5.延迟删除。 删除一个键值只是打标记,只有在提升dirty的时候才清理删除的数据。

6.优先从read读取、更新、删除,因为对read的读取不需要锁。

sync.map与map性能对比

要使用的场景

实现方式 原理 适用场景
map+Mutex 通过Mutex互斥锁来实现多个goroutine对map的串行化访问 读写都需要通过Mutex加锁和释放锁,适用于读写比接近的场景
map+RWMutex 通过RWMutex来实现对map的读写进行读写锁分离加锁,从而实现读的并发性能提高 同Mutex相比适用于读多写少的场景
sync.Map 底层通分离读写map和原子指令来实现读的近似无锁,并通过延迟更新的方式来保证读的无锁化 读多修改少,元素增加删除频率不高的情况,在大多数情况下替代上述两种实现

长度问题

sync.Map没有Len方法,并且目前没有迹象要加上 (issue#20680),所以如果想得到当前Map中有效的entries的数量,需要使用Range方法遍历一次。

大致原因是:len如果需要准确,需要使用锁,不准确提供之后意义不大。