Golang Chan 作者: nbboy 时间: 2021-05-10 分类: 软件架构,软件工程,Golang # Golang Chan > *Do not communicate by sharing memory; instead, share memory by communicating.* **分析版本:1.15.2** ### 设计 chan主要分为带缓存chan和不带缓存的chan,带缓存的chan设计是遵循FIFO特性的,在Go中大多数情况会使用chan通信。不带缓冲的chan,可以通过不指定cap的方式来创建,这种chan其实是阻塞的同步工具,如果没有另外一端去接收,或者关闭则阻塞在这里。 ```go ch :=make(chan struct{}) ``` 可以通过指定cap的值来让其变成一个带缓冲的chan ```go ch :=make(chan struct{}, 3) ch <- struct{}{} ``` 如上代码并不会让程序进入阻塞状态。有时候,可以利用chan的这一特性作goroutine之间的通信。 ```go func main() { ch := make(chan int, 3) go func() { for { select { case ele := <-ch: fmt.Println("value ", ele) return } } }() ch <- 1 time.Sleep(5 * time.Second) } ``` 一个读一个写,写入后是存在缓冲里的,在后面实现分析中能看到这一点。再来修改下代码,直接关闭chan。 ```go func main() { ch := make(chan int, 3) go func() { for { select { case ele, ok := <-ch: if !ok { fmt.Println("closed ") } else { fmt.Println("value ", ele) } return } } }() close(ch) //close(ch) panic time.Sleep(5 * time.Second) } ``` 通道已经被关闭,接收端能够感知到,需要注意的是,如果通道被关闭两次,第二次就会panic。 ### 安全关闭 ### 双向通道与单向通道 - `chan T` 表示双向通道 - `chan<- T` 表示只写通道 - `<-chan T` 表示只读通道 两个goroutine之间同步可以用chan,也可以用传统的同步语义,比如mutex,atomic,sema等等。 ### hchan结构与实现 chan的结构定义在chan.go文件里 ```go type hchan struct { qcount uint // 队列中总的数量 dataqsiz uint // 环形队列元素数量 buf unsafe.Pointer // 指向环形队列的元素首地址 elemsize uint16 closed uint32 elemtype *_type // 元素类型 sendx uint // 发送操作处理的位置 recvx uint // 接收操作处理的位置 recvq waitq // 接收等待者列表 sendq waitq // 发送等待者列表 lock mutex //用锁保护成员 } ``` 通过make去创建chan,实际上会调用chan.go->makechan函数 ```go func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. // 元素值的大小不能大于2的16次 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 判断下总字节数,超过直接panic mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0://size如果是0,分配基本hchan就可以了 // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0://如果带缓冲的chan,不包含指针,就紧接着hchan分配了存储元素用的空间 // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize)//指向了元素的地址 default://如果包含指针,内存托管到gc管理 // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c } ``` - 如果元素个数是0(不指定)的情况下,只分配管理结构hchan的内存空间,就是不带缓冲的情况 - 如果元素类型不是指针,那就除了分配管理结构hchan,另外再分配一段连续内存给hchan,其紧挨hchan分配 - 如果元素类型是指针,分配hchan,另外再分配元素空间,而且hchan的内存给gc进行托管,这里具体不是很明白,到时候看gc内存的时候在回过来看 ##### Channel操作 根据操作和chan的各种情况先汇总一下 | **Operation** | **A Nil Channel** | **A Closed Channel** | **A Not-Closed Non-Nil Channel** | | ---------------------- | ----------------- | -------------------- | -------------------------------- | | **Close** | panic(I) | panic(F) | succeed to close(C) | | **Send Value To** | block for ever(H) | panic(E) | block or succeed to send(B) | | **Receive Value From** | block for ever(G) | never block(D) | block or succeed to receive(A) | 这个表格清晰得示意了各种情况,现在深入代码看看发送和接收,关闭。 ```go /* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed. it is easiest to loop and re-run * the operation; we'll see that it's now closed. */ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block {//非阻塞模式直接返回 return false } //如果chan是nil,并且是阻塞模式下,则直接阻塞,这里对应表格中H那种情况 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } //... if !block && c.closed == 0 && full(c) {//fast path,如果是还没关闭的chan,并且已经缓冲已经满了,则直接返回false return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //如果chan已经被close,则panic,对应表格中的E情况 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } //如果有接受者在等待,则直接传递数据 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). // 拷贝数据到调用者栈或者唤醒调用者 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } //如果缓冲队列里还有空间,则把数据移动到缓冲队列中 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } //如果不需要陷入阻塞模式,则返回 if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() //获取一个sudog对象 mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) //阻塞当前g gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) //... releaseSudog(mysg) return true } ``` 根据接收者队列和buff队列的数据情况,chan发送可以分成三种情况。 - 如果有接受者在等待,则直接拷贝数据到接受者,并且唤醒接受者。 - 如果缓冲队列里还有空间,则把数据移动到缓冲队列中。 - 如果不是上面两种情况,并且chan是在同步模式,则g开始阻塞,等待接收者准备接收。 相应的接收流程也差不多,如下所示 ```go // chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). // A non-nil ep must point to the heap or the caller's stack. // 注释中说的很明白,ep可能为空,则接收的数据被忽略。如果block为flase,并且没有数据是有效的,则直接返回flase // 否则chan是被关闭,则也直接返回,最后一种情况就是正常的情况了,携带数据返回。 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { //... if c == nil { if !block { return } //接收者阻塞,对应G情况 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. // 快速路径,在不阻塞情况下,判断是否空 if !block && empty(c) { // 多个goroutine重排情况下,会有问题,这里加锁保护下 if atomic.Load(&c.closed) == 0 { return } // 再次去检测chan是否为空 if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //chan已经关闭,并且没有元素,则直接返回 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } //发送队列里有发送者在等待,则直接传递数据 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). // 找到一个发送者,如果buff size是0,则直接拷贝就可以,否则从buf 的队列头去读取,其实在满的情况下,tail和head指向的是同一个地方 // buf 这里一定是满的,想想为什么 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } //队列里面有元素,则从队列里面读取 if c.qcount > 0 { // Receive directly from queue // 移动指针到接收位置 qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } //如果不是阻塞模式,就直接返回 if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil //接受者进入接受者等待队列 c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) //... releaseSudog(mysg) return true, !closed } ``` 接收动作也存在三种情况 - 发送队列里有发送者在等待,则直接拷贝数据,并且唤醒发送者。 - buff队列里面有元素,则从队列里面读取。 - 如果不属于上面两种情况,并且是在同步模式下,则g开始阻塞,并且等待发送者准备发送。 close操作就是对数据的释放,包括发送等待者队列和接收等待者队列等资源。 ```go func closechan(c *hchan) { //如果对一个nil chan进行关闭,则panic,对应表里情况I if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) //如果对一个已经关闭的chan再次关闭,则panic,对应情况F if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist gList // release all readers // 释放所有的接收者队列 for { sg := c.recvq.dequeue() if sg == nil { break } //清除传递的数据 if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // 释放所有的发送者队列 // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 把所有等待的g状态都改为ready状态 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } } ``` 通过阅读代码,可以看到: - 关闭一个nil chan和一个已经关闭的chan,则直接panic - 发送等待队列和接收等待队列都重置 - 所有等待队列都设置为Ready状态,意味着调度器可以调度这些G了 ##### 环形缓冲队列 假设创建了带缓冲的channel ```go ch := make(chan int, 6) ``` ch初始化后,数据位置是这样的,Sendx和Recvx都指向起点,即sendx=recvx=1 ``` | Sendx | | v +-------+-------+-------+-------+-------+-------+ | | | | | | | BUF | | | | | | | | | | | | | +-------+-------+-------+-------+-------+-------+ ^ | | Recvx | ``` 在向ch发送一个数据后,先判断qcount是否超过dataqsiz(qcount0),如果有数据,则读取recvx位置上的数据,并且recvx+=1,并且qcount-=1。同样的情况,如果recvx到达末尾,则也重置为0。 ``` | Sendx | | v +-------+-------+-------+-------+-------+-------+ | | | | | | | BUF | | | | | | | | | | | | | +-------+-------+-------+-------+-------+-------+ ^ | | Recvx | ``` ##### FIFO特性 接收的时候是如何保证FIFO特性的,看下相关代码 ```go // recv processes a receive operation on a full channel c. // There are 2 parts: // 1) The value sent by the sender sg is put into the channel // and the sender is woken up to go on its merry way. // 2) The value received by the receiver (the current G) is // written to ep. // For synchronous channels, both values are the same. // For asynchronous channels, the receiver gets its data from // the channel buffer and the sender's data is put in the // channel buffer. // Channel c must be full and locked. recv unlocks c with unlockf. // sg must already be dequeued from c. // A non-nil ep must point to the heap or the caller's stack. // chan 一定是满的情况下才会处理 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 {//同步chan,则直接拷贝 if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. // buf中接收数据的位置 qp := chanbuf(c, c.recvx) //... // copy data from queue to receiver // 从队列里,拷贝数据到接受者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } //发送者设置为read状态,准备调度 goready(gp, skip+1) } ``` 可以看到这里也分成2种模式去处理 - 在同步模式下,则直接从发送者拷贝数据到接收者 - 在异步模式下,则会优先去buff head中拿数据,并且拷贝发送者数据到buff tail,并且指针都下移 用直观的图来示意一下异步模式下的操作过程,假设现在chan中存储了如下数据,buff中数据其实已经存储满了,并且指针sendx=recvx=1 ``` +--------------------------------+ |dataqsiz=6,qcount=6,sg.elem='g' | +--------------------------------+ | Sendx | | v +-------+-------+-------+-------+-------+-------+ | | | | | | | BUF f | a | b | c | d | e | | | | | | | | +-------+-------+-------+-------+-------+-------+ ^ | | Recvx | ``` 在操作之后,buff数据是这样的,相当于在头部执行了一次dequeue,然后在尾部执行一次enqueue ``` +--------------------------------+ |dataqsiz=6,qcount=6,sg.elem='g' | +--------------------------------+ | Sendx | | v +-------+-------+-------+-------+-------+-------+ | | | | | | | BUF f | g | b | c | d | e | | | | | | | | +-------+-------+-------+-------+-------+-------+ ^ | | Recvx | ``` ##### Refs Go_ Buffered and Unbuffered Channels _ by Vincent Blanchon _ A Journey With Go _ Medium Share Memory By Communicating - The Go Blog https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/ https://go101.org/article/channel-closing.html https://go101.org/article/channel-use-cases.html https://go101.org/article/channel.html 标签: Golang, Channel, FIFO
评论已关闭