Git模型简述 作者: nbboy 时间: 2021-05-18 分类: 软件架构,软件工程,设计模式 评论 > 每个团队根据自己的情况决定采用哪种开发模型,然后团队成员一起去遵循。 ### Gitflow gitflow.pn 借用网上的一副图来描述下各个分支的作用,这幅图基本概括了Gitflow的分支作用。 ###### Master分支 只有一个主分支,用来交付正式的发布版本,该分支不是来自直接的提交记录,而是由Release分支和Hotfix分支合并过来的。 ###### Develop分支 开发分支,也应该只有一个开发分支。该分支从Master分支或者Hotfix分支克隆,并且接收Feature分支和Release分支的合并。 ###### Feature分支 即功能分支,每个新功能都应该建立Feature分支,该分支从Develop克隆,并且合并到Develop后删除。 ###### Release分支 即预发布分支,在正式发布前一般会先发布到预发布环境进行测试,有问题会在该分支上进行修改,直到没问题就合并到Master分支,并打上一个版本号。 ###### Hotfix分支 即补丁分支,主要对线上Master分支进行Bug修复,然后会合并到Master分支和Develop分支。 Gitflow相对来说操作复杂一点,但是各个分支都很明确,一般开源社区会采用Github flow,下面我们来介绍下这个Git模型。 ### Github flow 这个模型下其实只有一个Master长线分支,需要开发功能或者修改Bug都从Master分支克隆。如果要申请合并到Master就需要开一个PR(Pull Request),这个相当于一个请求,其他CodeReview用户和作者可以在这个PR上进行讨论,而且在这个过程中,作者还可提交代码到PR。如果这个PR被接收,则会被合并到Master。 这个过程其实简单了不少,所有合并操作都是围绕PR进行,讨论或者CodeReview都亦然。可以通过官网的介绍页更深入体会一下https://guides.github.com/introduction/flow/。 ### 参考 https://segmentfault.com/a/1190000021929465 https://www.ruanyifeng.com/blog/2015/12/git-workflow.html https://www.atlassian.com/git/tutorials/comparing-workflows/gitflow-workflow https://guides.github.com/introduction/flow/
Golang内存分配机制 作者: nbboy 时间: 2021-05-10 分类: 软件架构,软件工程,Golang 评论 # Golang内存分配机制 Go的内存分配分为微内存分配,小内存分配,大内存分配,微内存为小于16字节的内存分配,小内存则为大于16字节小于32KB的内存分配,大内存是大于32KB的内存分配。 ## Page与Span 在Golang中一个**内存页**为8K,其为Golang内存分配器以此为倍数单位大小去申请,比如申请16G。Span为一个或者多个连续的内存页组成,可以想象成如下结构: span.pn ## 小内存分配 ### mCache Go的内存分配采用的类似TCMalloc一样的分配策略,就是对需要分配的内存按照尺寸大小进行分级,按照不同的SizeClass分为<=8KB,<=16KB,<=32KB。 一个Goroutine去申请内存时,就是在其对应的P的mCache中去申请,申请的基本单元称为mSpan,根据mSpan的大小找到最接近其的内存块返回即可。因为其实在P内部去分配的,所有过程中不需要加全局锁,性能会快很多。 mCache.pn ### mCentral 当然mCache内存块也会分配光,所以它会向mCentral去要,mCentral有两个双向链表组成,一个用来可以被用来分配的内存块组(Non Empty List),另外一个用来存放已经分配出去的内存块组(Empty List),但是mCentral分配和释放的时候是需要加全局锁的,因为它也是所有P共享的。比如分配的流程是这样的: 1. 获得全局锁GL 2. 在Non Empty List中获得一块内存 3. 在Empty List记录下这块内存 4. 释放全局锁GL mCentral.pn ### mHeap 当mCentral也没有多余的内存的时候,就需要向mHeap进行申请,而mHeap中存放的是未被切割成一块一块的大的内存块,其称为Arena内存块,在mHeap中以二维数组的形式进行存放。 mHeap.pn ## 微内存分配 对于很多频繁申请并且尺寸很小的 内存需求我们叫Tiny对象,其如果每次都是申请然后马上释放,这样会造成空间浪费,所以Golang对这块做了一些优化。想小字符串,或者临时的逃逸变量等等,都会按照微内存来分配,用完之后该空间还能继续使用。 可以把很多Tiny对象打包成一个大对象,然后统一分配一个小内存给该区域就可以。 tiny_obj.pn ## 大内存分配 大内存指的就是大于32Kb的内存,这部分内存分配频率不会太高,但是尺寸可能往往很大,所以直接让mHeap去分配,如果mHeap中的Arena内存块不够用,再向操作系统申请虚拟内存即可。 ## 总结 Golang内存分配主要思想还是按照不同的尺寸去获取最适合的内存块,和Memcached的Slab内存分配策略有一些像,主要目的就是减少内存碎片。 ### 参考 毛大的PPT https://draveness.me/golang/docs/part3-runtime/ch07-memory/golang-memory-allocator/ https://zhuanlan.zhihu.com/p/59125443 未读
Golang Context 作者: nbboy 时间: 2021-05-10 分类: 软件架构,设计模式,C,Golang 评论 # Context > 使用上下文的一个很好的心理模型是它应该在您的程序中流通,想象一条河或流水。 **分析版本:1.15.2** ### Context context接口只有4个方法,先看下接口定义 ```go type Context interface { //超时时间 Deadline() (deadline time.Time, ok bool) //需要监听的通道 Done() <-chan struct{} //如果没有关闭,则返回nil,否则返回一个错误值 Err() error //指定key的value Value(key interface{}) interface{} } ``` 目前版本中,实现的struct为emptyCtx,cancelCtx,timerCtx,valueCtx,每个ctx对应的应用场景都不一样,先看下最简单的emptyCtx ### emptyCtx ```go var ( background = new(emptyCtx) todo = new(emptyCtx) ) func Background() Context { return background } func TODO() Context { return todo } ``` TODO和Background用的都是emptyCtx,Background主要被用来作为其他Ctx的根,而TODO主要可以视为一种nil的Ctx去用,因为在官方的设计中,不允许使用nil作为Ctx的值。emptyCtx的实现非常简单,不做具体介绍,都是空的方法体。 先看下比较常用的cancelCtx的使用方法 ### cancelCtx ```go gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() { for { select { case <-ctx.Done()://取消后从这里返回 fmt.Println("ctx.done") return case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) for n := range gen(ctx) { fmt.Println(n) if n == 5 { //达到目标,取消ctx cancel() break } } time.Sleep(3 * time.Second) ``` cancelCtx主要用来控制goroutine的生命周期,即什么时候结束生命周期,当然这个需要goroutine本身去配合,select Done返回的通道。再看下,cancelCtx的内部结构 ```go type cancelCtx struct { Context mu sync.Mutex // protects following fields 用来保护成员 done chan struct{} // created lazily, closed by first cancel call 就是Done()返回的chan,调用cancel()后就被关闭 children map[canceler]struct{} // set to nil by the first cancel call 子ctx,所有ctx会组成一颗树形结构,而此处指向其孩子节点 err error // set to non-nil by the first cancel call 调用cancel()后,被设置成取消原因 } ``` 这里看下cancelCtx的构建函数 ```go func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { if parent == nil { panic("cannot create context from nil parent") } //创建cancelCtx,关联传递进来的ctx c := newCancelCtx(parent) propagateCancel(parent, &c) return &c, func() { c.cancel(true, Canceled) } } func propagateCancel(parent Context, child canceler) { //首先检查父ctx是否关闭 done := parent.Done() if done == nil { return // parent is never canceled } select { case <-done: // parent is already canceled // 如果父ctx被取消,则也同步取消子ctx child.cancel(false, parent.Err()) return default: } //如果找到了cancelCtx if p, ok := parentCancelCtx(parent); ok { p.mu.Lock() if p.err != nil {//父节点已经被取消 // parent has already been canceled child.cancel(false, p.err) } else {//如果没被取消,则把子节点挂到父节点 if p.children == nil { p.children = make(map[canceler]struct{}) } p.children[child] = struct{}{} } p.mu.Unlock() } else { //如果是自定义的ctx,就会开启一个goroutine去监听父的取消事件,并且取消子ctx atomic.AddInt32(&goroutines, +1) go func() { select { case <-parent.Done(): child.cancel(false, parent.Err()) case <-child.Done(): } }() } } func parentCancelCtx(parent Context) (*cancelCtx, bool) { done := parent.Done() //如果关闭,则返回false if done == closedchan || done == nil { return nil, false } //看下是否cancelCtx p, ok := parent.Value(&cancelCtxKey).(*cancelCtx) //没找到则返回false if !ok { return nil, false } p.mu.Lock() ok = p.done == done p.mu.Unlock() //如果不一样也返回flse if !ok { return nil, false } //通过深层查找,找到了cancelCxt,则才返回true return p, true } ``` - 父Ctx如果不需要被取消,则直接返回,Background,TODO就是不需要被取消的类型 - 如果父ctx被取消,则也同步取消子ctx - parentCancelCtx会深层次得去找父cancelCtx,这里分两种情况 1)如果是标准(cancelCtx,timerCtx)则会同步父子Ctx的状态(要么都同步取消,要么建立关系) 2)如果是自定义Ctx,就会开启一个goroutine去监听父的取消事件,并且取消子ctx 这里这么做的原因,就是需要把子节点的状态和父节点要同步,调用withCancel()返回的cancel函数其实是调用cancelCtx.cancel()函数 ```go // cancel closes c.done, cancels each of c's children, and, if // removeFromParent is true, removes c from its parent's children. func (c *cancelCtx) cancel(removeFromParent bool, err error) { if err == nil { panic("context: internal error: missing cancel error") } c.mu.Lock() if c.err != nil { c.mu.Unlock() return // already canceled } c.err = err if c.done == nil { c.done = closedchan } else { //关闭chan close(c.done) } //子ctx依次进行cancel for child := range c.children { // NOTE: acquiring the child's lock while holding parent's lock. child.cancel(false, err) } c.children = nil c.mu.Unlock() //从根节点里移除c if removeFromParent { removeChild(c.Context, c) } } ``` 注释中说的很明白,会把打开的chan关闭,然后依次调用子ctx的cancel,所以如果我们忘记调用cancel,其实会有大量的chan没被close掉,然后造成资源的浪费! 此处试下级联取消,代码如下 ```go func main() { ctx1, cancel1 := context.WithCancel(context.Background()) defer cancel1() ctx2, cancel2 := context.WithCancel(ctx1) defer cancel2() ctx3, cancel3 := context.WithCancel(ctx2) defer cancel3() go func() { select { case <-time.After(3 * time.Second): cancel1() } }() <-ctx3.Done() } ``` 创建了三个ctx,然后第一个ctx取消后,其下的所有ctx都会取消。需要注意,代码中其实ctx1被cancel了两次,通过了解实现的代码,知道这么写其实并没有什么问题。画一个图,直观了解下3个ctx组成的结构。取消是沿着继承链,从除了根部外(Background不能被取消)一直到所有节点执行取消操作! ┌───────────────────┐ │ ┌───────────────┐ │ │ │ Background │ │ │ └───────────────┘ │ └─────────┬─────────┘ │ │ │ │ │ ┌─────────▼─────────┐ │ │ ┌───────────────┐ │ │ │ │ Ctx1 │ │ │ │ └───────────────┘ │ │ └─────────┬─────────┘ │ │ │ │ │ │ │ │ │ ┌─────────▼─────────┐ │ │ ┌───────────────┐ │ Cancel │ │ Ctx2 │ │ │ │ └───────────────┘ │ │ └─────────┬─────────┘ │ │ │ │ │ │ │ │ │ ┌─────────▼─────────┐ │ │ ┌───────────────┐ │ │ │ │ Ctx3 │ │ │ │ └───────────────┘ │ ▼ └───────────────────┘ ### timerCtx 其实Context最牛的功能我觉得还是timerCtx,先来看一下这个功能一个简单的例子。 ```go func main() { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() result := make(chan int, 3) for i := 0; i < 10; i++ { go func(i int) { for { select { case <-ctx.Done(): fmt.Println("return") return case result <- i: } } }(i) } for { select { case r := <-result: fmt.Println(r) case <-ctx.Done(): return } } } ``` 其实这个ctx就是定义了一个具有超时功能的上下文,一般可以应用在可能会长时间执行的任务上,如果该任务长时间执行,我们可以设置一个ctx,超时时间到来,goroutine就从该任务返回,不会造成任务失控的情况。继续看下timerCtx的结构 ```go // A timerCtx carries a timer and a deadline. It embeds a cancelCtx to // implement Done and Err. It implements cancel by stopping its timer then // delegating to cancelCtx.cancel. // 通过计时器去实现任务的取消 type timerCtx struct { //内部也是继承了cancelCtx,所以也具有取消的能力 cancelCtx timer *time.Timer // Under cancelCtx.mu. //超时时间点 deadline time.Time } ``` WithTimeout其实内部会调用WithDeadline,我们分析下该方法 ```go func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) { if parent == nil { panic("cannot create context from nil parent") } //父节点早于子节点指定时间,直接返回父节点,因为后面设置其实没意义 if cur, ok := parent.Deadline(); ok && cur.Before(d) { // The current deadline is already sooner than the new one. return WithCancel(parent) } c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: d, } propagateCancel(parent, c) dur := time.Until(d) //时间已经到了,就直接cancel if dur <= 0 { c.cancel(true, DeadlineExceeded) // deadline has already passed return c, func() { c.cancel(false, Canceled) } } c.mu.Lock() defer c.mu.Unlock() //用定时器去处理延迟cancel if c.err == nil { c.timer = time.AfterFunc(dur, func() { c.cancel(true, DeadlineExceeded) }) } return c, func() { c.cancel(true, Canceled) } } ``` 这里有3点要注意: 1. 如果子Ctx超过了父Ctx则,直接使用父Ctx 2. 如果时间已经到期,则直接Cancel 3. 否则就注册一个定时器在未来一个时间执行 这里比较关心的是,他内部其实维护了一个定时器,就是那么简单而已!!!在分析一下对应的cancel方法 ```go func (c *timerCtx) cancel(removeFromParent bool, err error) { c.cancelCtx.cancel(false, err) if removeFromParent { // Remove this timerCtx from its parent cancelCtx's children. removeChild(c.cancelCtx.Context, c) } c.mu.Lock() //关闭定时器 if c.timer != nil { c.timer.Stop() c.timer = nil } c.mu.Unlock() } ``` 比cancelCtx多了停止定时器的操作。 ### valueCtx WithValue很容易建立valueCtx,valueCtx结构如下 ```go // A valueCtx carries a key-value pair. It implements Value for that key and // delegates all other calls to the embedded Context. type valueCtx struct { Context key, val interface{} } func WithValue(parent Context, key, val interface{}) Context { if parent == nil { panic("cannot create context from nil parent") } if key == nil { panic("nil key") } if !reflectlite.TypeOf(key).Comparable() { panic("key is not comparable") } return &valueCtx{parent, key, val} } ``` WithValue只能通过添加的方式把父Ctx和新Ctx建立连接,很显然整个Ctx看起来应该就是一棵树一样。比如如下的代码 ```go type TrackId string func main() { ctx1 := context.WithValue(context.Background(), TrackId("2021"), "123456") ctx2 := context.WithValue(ctx1, TrackId("2020"), "111111") ctx3 := context.WithValue(ctx1, TrackId("2020"), "222222") ctx4 := context.WithValue(ctx2, TrackId("2019"), "333333") ctx5 := context.WithValue(ctx2, TrackId("2019"), "444444") ctx6 := context.WithValue(ctx3, TrackId("2018"), "555555") ctx7 := context.WithValue(ctx3, TrackId("2018"), "666666") var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() fmt.Println("ctx4 ", ctx4.Value(TrackId("2019"))) fmt.Println("ctx5 ", ctx5.Value(TrackId("2019"))) fmt.Println("ctx6 ", ctx6.Value(TrackId("2020"))) fmt.Println("ctx7 ", ctx7.Value(TrackId("2021"))) }() wg.Wait() } ``` 其实就是会建立这样一棵树结构 ``` ┌────────────────────────┐ │ ┌********************┐ │ │ * Background * │ │ └********************┘ │ └────────────┬───────────┘ │ │ ┌────────────▼───────────┐ │ │ │ ctx1(val:123456) ◀──────Step3 │ │ └────────────┬───────────┘ │ ┌──────────────────────────┴──────────────────────────┐ │ │ │ │ ┌────────────▼───────────┐ ┌────────────▼───────────┐ │ │ │ │ │ ctx2(val:111111) │ │ ctx3(val:222222) ◀──────Step2 │ │ │ │ └────────────┬───────────┘ └────────────┬───────────┘ │ │ ┌─────────────┴────────────┐ ┌─────────────┴────────────┐ │ │ │ │ │ │ │ │ ┌────────────▼───────────┐ ┌────────────▼───────────┐ ┌────────────▼───────────┐ ┌────────────▼───────────┐ │ │ │ │ │ │ │ │ │ ctx4(val:333333) │ │ ctx5(val:444444) │ │ ctx6(val:555555) │ │ ctx7(val:666666) ◀──────Step1 │ │ │ │ │ │ │ │ └────────────────────────┘ └────────────────────────┘ └────────────────────────┘ └────────────────────────┘ ``` 当然如果调用挂载的节点越多,这棵树就越大,而遍历这棵树找value信息就越慢,事实上上找value信息就是通过往上递归遍历的方法来查找的。 ```go func (c *valueCtx) Value(key interface{}) interface{} { if c.key == key { return c.val } return c.Context.Value(key) } ``` 比如ctx7.Value(TrackId("2021"))就需要通过Step1,2,3才能找到最终的value:123456。 ### 总结 timerCtx,cancelCtx可以认为是管理goroutine生命周期的一类Ctx,另外valueCtx是传递参数作用的Ctx,使用的场景其实有区别,比较会误用的是valueCtx。这里摘取了一些使用中容易挖的坑,其实要使用好它,还真不容易!
Golang Chan 作者: nbboy 时间: 2021-05-10 分类: 软件架构,软件工程,Golang 评论 # Golang Chan > *Do not communicate by sharing memory; instead, share memory by communicating.* **分析版本:1.15.2** ### 设计 chan主要分为带缓存chan和不带缓存的chan,带缓存的chan设计是遵循FIFO特性的,在Go中大多数情况会使用chan通信。不带缓冲的chan,可以通过不指定cap的方式来创建,这种chan其实是阻塞的同步工具,如果没有另外一端去接收,或者关闭则阻塞在这里。 ```go ch :=make(chan struct{}) ``` 可以通过指定cap的值来让其变成一个带缓冲的chan ```go ch :=make(chan struct{}, 3) ch <- struct{}{} ``` 如上代码并不会让程序进入阻塞状态。有时候,可以利用chan的这一特性作goroutine之间的通信。 ```go func main() { ch := make(chan int, 3) go func() { for { select { case ele := <-ch: fmt.Println("value ", ele) return } } }() ch <- 1 time.Sleep(5 * time.Second) } ``` 一个读一个写,写入后是存在缓冲里的,在后面实现分析中能看到这一点。再来修改下代码,直接关闭chan。 ```go func main() { ch := make(chan int, 3) go func() { for { select { case ele, ok := <-ch: if !ok { fmt.Println("closed ") } else { fmt.Println("value ", ele) } return } } }() close(ch) //close(ch) panic time.Sleep(5 * time.Second) } ``` 通道已经被关闭,接收端能够感知到,需要注意的是,如果通道被关闭两次,第二次就会panic。 ### 安全关闭 ### 双向通道与单向通道 - `chan T` 表示双向通道 - `chan<- T` 表示只写通道 - `<-chan T` 表示只读通道 两个goroutine之间同步可以用chan,也可以用传统的同步语义,比如mutex,atomic,sema等等。 ### hchan结构与实现 chan的结构定义在chan.go文件里 ```go type hchan struct { qcount uint // 队列中总的数量 dataqsiz uint // 环形队列元素数量 buf unsafe.Pointer // 指向环形队列的元素首地址 elemsize uint16 closed uint32 elemtype *_type // 元素类型 sendx uint // 发送操作处理的位置 recvx uint // 接收操作处理的位置 recvq waitq // 接收等待者列表 sendq waitq // 发送等待者列表 lock mutex //用锁保护成员 } ``` 通过make去创建chan,实际上会调用chan.go->makechan函数 ```go func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. // 元素值的大小不能大于2的16次 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 判断下总字节数,超过直接panic mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0://size如果是0,分配基本hchan就可以了 // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0://如果带缓冲的chan,不包含指针,就紧接着hchan分配了存储元素用的空间 // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize)//指向了元素的地址 default://如果包含指针,内存托管到gc管理 // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c } ``` - 如果元素个数是0(不指定)的情况下,只分配管理结构hchan的内存空间,就是不带缓冲的情况 - 如果元素类型不是指针,那就除了分配管理结构hchan,另外再分配一段连续内存给hchan,其紧挨hchan分配 - 如果元素类型是指针,分配hchan,另外再分配元素空间,而且hchan的内存给gc进行托管,这里具体不是很明白,到时候看gc内存的时候在回过来看 ##### Channel操作 根据操作和chan的各种情况先汇总一下 | **Operation** | **A Nil Channel** | **A Closed Channel** | **A Not-Closed Non-Nil Channel** | | ---------------------- | ----------------- | -------------------- | -------------------------------- | | **Close** | panic(I) | panic(F) | succeed to close(C) | | **Send Value To** | block for ever(H) | panic(E) | block or succeed to send(B) | | **Receive Value From** | block for ever(G) | never block(D) | block or succeed to receive(A) | 这个表格清晰得示意了各种情况,现在深入代码看看发送和接收,关闭。 ```go /* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed. it is easiest to loop and re-run * the operation; we'll see that it's now closed. */ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block {//非阻塞模式直接返回 return false } //如果chan是nil,并且是阻塞模式下,则直接阻塞,这里对应表格中H那种情况 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } //... if !block && c.closed == 0 && full(c) {//fast path,如果是还没关闭的chan,并且已经缓冲已经满了,则直接返回false return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //如果chan已经被close,则panic,对应表格中的E情况 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } //如果有接受者在等待,则直接传递数据 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). // 拷贝数据到调用者栈或者唤醒调用者 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } //如果缓冲队列里还有空间,则把数据移动到缓冲队列中 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } //如果不需要陷入阻塞模式,则返回 if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() //获取一个sudog对象 mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) //阻塞当前g gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) //... releaseSudog(mysg) return true } ``` 根据接收者队列和buff队列的数据情况,chan发送可以分成三种情况。 - 如果有接受者在等待,则直接拷贝数据到接受者,并且唤醒接受者。 - 如果缓冲队列里还有空间,则把数据移动到缓冲队列中。 - 如果不是上面两种情况,并且chan是在同步模式,则g开始阻塞,等待接收者准备接收。 相应的接收流程也差不多,如下所示 ```go // chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). // A non-nil ep must point to the heap or the caller's stack. // 注释中说的很明白,ep可能为空,则接收的数据被忽略。如果block为flase,并且没有数据是有效的,则直接返回flase // 否则chan是被关闭,则也直接返回,最后一种情况就是正常的情况了,携带数据返回。 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { //... if c == nil { if !block { return } //接收者阻塞,对应G情况 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. // 快速路径,在不阻塞情况下,判断是否空 if !block && empty(c) { // 多个goroutine重排情况下,会有问题,这里加锁保护下 if atomic.Load(&c.closed) == 0 { return } // 再次去检测chan是否为空 if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //chan已经关闭,并且没有元素,则直接返回 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } //发送队列里有发送者在等待,则直接传递数据 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). // 找到一个发送者,如果buff size是0,则直接拷贝就可以,否则从buf 的队列头去读取,其实在满的情况下,tail和head指向的是同一个地方 // buf 这里一定是满的,想想为什么 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } //队列里面有元素,则从队列里面读取 if c.qcount > 0 { // Receive directly from queue // 移动指针到接收位置 qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } //如果不是阻塞模式,就直接返回 if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil //接受者进入接受者等待队列 c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) //... releaseSudog(mysg) return true, !closed } ``` 接收动作也存在三种情况 - 发送队列里有发送者在等待,则直接拷贝数据,并且唤醒发送者。 - buff队列里面有元素,则从队列里面读取。 - 如果不属于上面两种情况,并且是在同步模式下,则g开始阻塞,并且等待发送者准备发送。 close操作就是对数据的释放,包括发送等待者队列和接收等待者队列等资源。 ```go func closechan(c *hchan) { //如果对一个nil chan进行关闭,则panic,对应表里情况I if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) //如果对一个已经关闭的chan再次关闭,则panic,对应情况F if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist gList // release all readers // 释放所有的接收者队列 for { sg := c.recvq.dequeue() if sg == nil { break } //清除传递的数据 if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // 释放所有的发送者队列 // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 把所有等待的g状态都改为ready状态 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } } ``` 通过阅读代码,可以看到: - 关闭一个nil chan和一个已经关闭的chan,则直接panic - 发送等待队列和接收等待队列都重置 - 所有等待队列都设置为Ready状态,意味着调度器可以调度这些G了 ##### 环形缓冲队列 假设创建了带缓冲的channel ```go ch := make(chan int, 6) ``` ch初始化后,数据位置是这样的,Sendx和Recvx都指向起点,即sendx=recvx=1 ``` | Sendx | | v +-------+-------+-------+-------+-------+-------+ | | | | | | | BUF | | | | | | | | | | | | | +-------+-------+-------+-------+-------+-------+ ^ | | Recvx | ``` 在向ch发送一个数据后,先判断qcount是否超过dataqsiz(qcount0),如果有数据,则读取recvx位置上的数据,并且recvx+=1,并且qcount-=1。同样的情况,如果recvx到达末尾,则也重置为0。 ``` | Sendx | | v +-------+-------+-------+-------+-------+-------+ | | | | | | | BUF | | | | | | | | | | | | | +-------+-------+-------+-------+-------+-------+ ^ | | Recvx | ``` ##### FIFO特性 接收的时候是如何保证FIFO特性的,看下相关代码 ```go // recv processes a receive operation on a full channel c. // There are 2 parts: // 1) The value sent by the sender sg is put into the channel // and the sender is woken up to go on its merry way. // 2) The value received by the receiver (the current G) is // written to ep. // For synchronous channels, both values are the same. // For asynchronous channels, the receiver gets its data from // the channel buffer and the sender's data is put in the // channel buffer. // Channel c must be full and locked. recv unlocks c with unlockf. // sg must already be dequeued from c. // A non-nil ep must point to the heap or the caller's stack. // chan 一定是满的情况下才会处理 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 {//同步chan,则直接拷贝 if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. // buf中接收数据的位置 qp := chanbuf(c, c.recvx) //... // copy data from queue to receiver // 从队列里,拷贝数据到接受者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } //发送者设置为read状态,准备调度 goready(gp, skip+1) } ``` 可以看到这里也分成2种模式去处理 - 在同步模式下,则直接从发送者拷贝数据到接收者 - 在异步模式下,则会优先去buff head中拿数据,并且拷贝发送者数据到buff tail,并且指针都下移 用直观的图来示意一下异步模式下的操作过程,假设现在chan中存储了如下数据,buff中数据其实已经存储满了,并且指针sendx=recvx=1 ``` +--------------------------------+ |dataqsiz=6,qcount=6,sg.elem='g' | +--------------------------------+ | Sendx | | v +-------+-------+-------+-------+-------+-------+ | | | | | | | BUF f | a | b | c | d | e | | | | | | | | +-------+-------+-------+-------+-------+-------+ ^ | | Recvx | ``` 在操作之后,buff数据是这样的,相当于在头部执行了一次dequeue,然后在尾部执行一次enqueue ``` +--------------------------------+ |dataqsiz=6,qcount=6,sg.elem='g' | +--------------------------------+ | Sendx | | v +-------+-------+-------+-------+-------+-------+ | | | | | | | BUF f | g | b | c | d | e | | | | | | | | +-------+-------+-------+-------+-------+-------+ ^ | | Recvx | ``` ##### Refs Go_ Buffered and Unbuffered Channels _ by Vincent Blanchon _ A Journey With Go _ Medium Share Memory By Communicating - The Go Blog https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/ https://go101.org/article/channel-closing.html https://go101.org/article/channel-use-cases.html https://go101.org/article/channel.html