代码版本:1.12.7

channelGo语言里面用来传递数据的一个接口

主要代码在src/runtime/chan.go

常量定义

1
2
3
4
5
const (
maxAlign = 8 //偏移量
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) //这里就是对齐一下maxAlign
debugChan = false //debug模式,开启后会输出debug信息
)

类型定义

主要就两个类型,一个是chan的底层hchan,另外一个是waitq用来记录收发队列的goroutine的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type hchan struct {
qcount uint // 缓冲区中已有元素个数
dataqsiz uint // 循环队列容量大小
buf unsafe.Pointer // 缓冲区指针
elemsize uint16 // 元素大小
closed uint32 // 关闭标记,0没关闭,1关闭
elemtype *_type // 数据项类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 等待接收排队链表
sendq waitq // 等待发送排队链表
lock mutex // 锁
}

type waitq struct {
first *sudog
last *sudog
}

功能实现

新建channel ch := make(chan int)

创建一个channel的时候Go的语法是ch := make(chan datatype),这个在底层的实现实际是makechan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func makechan(t *chantype, size int) *hchan {
elem := t.elem

//1、检查待存的数据类型大小,大于1<<16时异常
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
//2、检查内存对齐(降低寻址次数,提高内存读取速度),大于最大的内存对齐字节数时,panic
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//3、检查传入的size大小,大于堆可分配的最大内存时,panic,可以看出chan是在堆里面分配内存的
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// mem为0的时候将c.buf指向自己,避免竞争,具体可看raceaddr注释
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 数据类型不包含指针的时候,直接分配hchanSize+size个数据的大小,然后将c.buf指向数据起始地址
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
//包含指针的时候分开申请内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

//初始化数据
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}

向channel发送数据 ch <- 1

向一个chan发送数据的语法是chan <- data,在底层会被转为chansend1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}

/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
//c是当前channel,ep是数据的指针,block代表是否阻塞,默认都是阻塞,只有在select中是非阻塞,callerpc是取的PC寄存器
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//如果当前channel是nil,在非阻塞的情况下会返回false,阻塞情况下会报错,按照历史提交来看的话,throw之前是return false,也就是再调度一轮后返回发送失败,现在的版本是直接在调度的时候就报错了
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\n")
}

if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
// 如果是非阻塞,并且channel未关闭,并且(缓冲区为空也没有接受者或者缓冲区放满了),直接返回false
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

// 如果channel关闭了,报错
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

//从接收者队列里面去取一个,直接把ep复制到接收者中,具体看send函数分析,然后返回成功
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

//如果没有接收者,缓冲区有空位,就放入缓冲区中
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

//没发出去,非阻塞直接返回
if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
//阻塞的就将自己休眠进入调度

//获取当前g
gp := getg()
//拿一个可用g
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 初始化一下
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 放入发送队列中
c.sendq.enqueue(mysg)
// 开始休眠调度
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
//保活
KeepAlive(ep)

// someone woke us up.
// 唤醒后gp.waiting应该等于mysg
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
// 唤醒的时候有传gp.param,所以判断下
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 释放g
mysg.c = nil
releaseSudog(mysg)
return true
}

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
// c是当前channel,sg是从接收队列中取出的g,ep是数据指针(必须是非空并且指向队或者调用放的栈上),unlockf解锁函数,发送后解锁用, skip作用未知
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
// 循环队列置位
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
// 接收者数据指针不为空就直接发送,具体看sendDirect分析,发送完后置空
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 获取接受者的g
gp := sg.g
unlockf()
// param赋值
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将接受者的g置为ready状态,让其可以调度
goready(gp, skip+1)
}

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.
// 因为按照规定,当前g是不能写别的g的栈的,所以需要一些处理
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src is on our stack, dst is a slot on another stack.

// Once we read sg.elem out of sg, it will no longer
// be updated if the destination's stack gets copied (shrunk).
// So make sure that no preemption points can happen between read & use.
dst := sg.elem
// 写屏障处理
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// No need for cgo write barrier checks because dst is always
// Go memory.
// 直接复制过去
memmove(dst, src, t.size)
}

从channel接收数据<- ch,a =<- ch,a,ok = <- ch

整体跟发送差不多,底层对应两种情况chanrecv1chanrecv2

chanrecv1对应着<- ch,a =<- ch,赋值的操作还是在汇编做的

chanrecv2则对应着a,ok =<- ch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\n")
}

//当前chan为空,非阻塞直接返回
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
//判断下是否有非阻塞返回的情况
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

// 如果关闭了并且缓冲区为空,直接返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

// 从发送者队列里面去取一个,直接把值复制到ep中,具体看recv函数分析,然后返回成功
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

// 如果缓冲区有值,从缓冲区取
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

// 都没取到,非阻塞直接返回
if !block {
unlock(&c.lock)
return false, false
}

// no sender available: block on this channel.
// 阻塞的就将自己休眠进入调度
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 初始化一下g
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
// written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果无缓冲
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
// 如果接收者有接收值,直接从发送者复制一份到ep中
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 从缓冲区取值
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
// 如果接收者有接收值,复制一份到ep中
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
// 循环队列置位
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将发送者的g置为ready状态,让其可以调度
goready(gp, skip+1)
}

// 与sendDirect 一样
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
// The channel is locked, so src will not move during this
// operation.
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}

关闭channel close(ch)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
//c是当前channel
func closechan(c *hchan) {
// 当前channel为空直接报错
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
// 之前已经关过
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}

// 置位关闭状态
c.closed = 1

var glist gList

// release all readers
// 释放所有的接收方
for {
// 从读取队列里取一个g
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 接收者接收值不为空清理一下
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 添加到glist等待释放
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}

// release all writers (they will panic)
// 释放所有发送者,会panic
for {
// 从发送者队列取
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 直接置空发送者的值,方便gc回收
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 添加到glist等待释放
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
// 将所有g置为ready状态让其可以被调度
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

select相关

在底层select会被转成对应的语法来调用上面的chansend,chanrecv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// compiler implements
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

// compiler implements
//
// select {
// case v = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbrecv(&v, c) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}

// compiler implements
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if c != nil && selectnbrecv2(&v, &ok, c) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
// TODO(khr): just return 2 values from this function, now that it is in Go.
selected, *received = chanrecv(c, elem, false)
return
}