4.10 学习 Go 协程:万能的通道模型(公式) ======================================= .. image:: http://image.iswbm.com/20200607145423.png 通道,是 Go 语言的一大特色,初次接触 Go 语言时,着实被他给惊艳到了,它让协程之间的通信变得非常的简单。 简单,意味着灵活,按理说,这是好事啊,可你要知道灵活的另一个潜台词,就是不标准,不同的人,使用的方式可能都不一样。 对于 通道 来说,我们要极力避免下面两种情况: 1. 对一个已关闭的通道,进行关闭 2. 对一个已关闭的通道,写入数据 因此,这两种操作,都会异常程序触发 panic。 然而,Go 语言并没有提供一个内置的函数来判断一个通道是否关闭,这肯定不是技术上的限制,应该另有原因。 本篇文章是我基于《\ `如何优雅地关闭 channel? `__\ 》这篇文章,再进行总结而得出来一套万能的通道编程模型,非常建议大家全文精读后收为已用。 1. 有隐患且不优雅的做法 ----------------------- 对此,可能有的人会自己写个函数来判断函数是否关闭 比如,针对第一种情况,可以包装个 SafeClose 函数,用 defer recover 关闭通道可能引起的 panic .. code:: go func SafeClose(ch chan T) (justClosed bool) { defer func() { if recover() != nil { // 一个函数的返回结果可以在defer调用中修改。 justClosed = false } }() // 假设ch != nil。 close(ch) // 如果ch已关闭,则产生一个恐慌。 return true // <=> justClosed = true; return } 而对于第二种情况,同样可以包装个 SafeSend 函数,用 defer recover 给通道发送数据可能引起的 panic .. code:: go func SafeSend(ch chan T, value T) (closed bool) { defer func() { if recover() != nil { closed = true } }() ch <- value // 如果ch已关闭,则产生一个恐慌。 return false // <=> closed = false; return } 上面这两种方法,虽然可以在一定程度上,可以解决通道操作的 panic 问题,但仍然存在隐患,比如下面这个例子, .. code:: go package main func SafeSend(ch chan bool, value bool) (closed bool) { defer func() { if recover() != nil { closed = true } }() ch <- value // 如果ch已关闭,则产生一个恐慌。 return false // <=> closed = false; return } func SafeClose(ch chan bool) (justClosed bool) { defer func() { if recover() != nil { // 一个函数的返回结果可以在defer调用中修改。 justClosed = false } }() // 假设ch != nil。 close(ch) // 如果ch已关闭,则产生一个恐慌。 return true // <=> justClosed = true; return } func main() { c := make(chan bool) go func() { for i := 0; i < 25; i++ { SafeSend(c, true) } }() SafeClose(c) } 直接 ``go run`` 是没有问题的,但你要是加个 ``-race`` ,Go 编译器就会替你排查数据竞争的隐患 .. image:: http://image.iswbm.com/20220109225820.png 2. 万能的通道编程模型 --------------------- 重新梳理一下思路,可以发现: 1. (发送者)对一个已关闭的通道,写入数据 ❌ 2. (接收者)对一个已关闭的通道,读取数据 ✅ 那么如果能保证发送者本身知道通道是关闭的,它就不会再傻傻地往一个已关闭的通道发送数据了。 一个发送者,N 个接收者 ~~~~~~~~~~~~~~~~~~~~~~ 关键问题是,发送者如何知道呢? Go 语言本身没有提供类似的函数,同时上一节,咱们也探讨了,使用 recover panic 的方式封装函数,同样会有数据竞争的问题。 语言层面不可行,那么就由开发者约定协议。 - 通道应当由唯一发送者关闭 - 若没有唯一发送者,则需要加“管理角色”的通道 第一点很好理解:当只有一个发送者时,他自己本身肯定是知道通道是否关闭,就不用再判断是否关闭了,自己想关闭就关闭,完全没事。 可要是没有唯一发送者呢? 这又要分两种情况了。 1. 多个发送者,一个接收者 2. 多个发送者,多个接收者 无论哪种场景,都会有数据竞争的问题。 上面我也说了,对于没有唯一发送者的方案就是加一个 “管理角色” 的通道。 为了方便解释,我将通道分为两种: 1. **业务通道**\ :承载数据,用于多个协程间共享数据 2. **管理通道**\ :仅为了标记业务通道是否关闭而存在 因此管理通道需要满足两个条件: **第一个条件:具备广播功能** 那只能是无缓冲通道(关闭后,所有 read 该通道的所有协程,都能明确的知道该通道已关闭)。 - 当该管理通道关闭了,说明业务通道也关闭了。 - 当该管理通道阻塞了,说明业务通道还没关闭。 **第二个条件:有唯一发送者** 这个开发者非常容易实现: - 对于多个发送者,一个接收者的场景,业务通道的这个接收者,就可以充当管理通道的 **唯一发送者** - 对于多个发送者,多个接收者的场景,就需要再单独开启一个媒介协程做 **唯一发送者** 针对这两个场景,这边分别举个例子 N个发送者,一个接收者 ~~~~~~~~~~~~~~~~~~~~~ 首先是多个发送者,一个接收者 .. code:: go package main import ( "math/rand" "sync" "time" ) func main() { rand.Seed(time.Now().UnixNano()) const Max = 100000 const NumSenders = 1000 wg := sync.WaitGroup{} wg.Add(1) // 业务通道 dataCh := make(chan int) // 管理通道:必须是无缓冲通道 // 其发送者是 业务通道的接收者。 // 其接收者是 业务通道的发送者。 stopCh := make(chan struct{}) // 业务通道的发送者 for i := 0; i < NumSenders; i++ { go func() { for { // 提前检查管理通道是否关闭 // 让业务通道发送者早尽量退出 select { case <- stopCh: return default: } select { case <- stopCh: return case dataCh <- rand.Intn(Max): } } }() } // 业务通道的接收者,亦充当管理通道的发送者 go func() { defer wg.Done() for value := range dataCh { if value == 6666 { // 当达到某个条件时 // 通过关闭管理通道来广播给所有业务通道的发送者 close(stopCh) return } } }() wg.Wait() } N个发送者,N个接收者 ~~~~~~~~~~~~~~~~~~~~ 然后是多个发送者,多个接收者,这个场景需要另外开启一个媒介协程。 媒介协程的作用,很明显啊,就是充当媒介,媒介要有自己的一个媒介通道: - 其发送者是:业务通道的所有发送者和接收者 - 其接收者是:媒介协程(是唯一的) 既然媒介协程只有一个,那自然而然地,媒介协程做为管理通道的 **唯一发送者**\ ,再合适不过了。 还有一个非常重要的点是,媒介协程要是媒介通道的接收者,因此它要先于业务通道的所有发送者、接收者启动。 这就要求,媒介通道,必须是缓冲通道,长度可以取 1 即可。 完整的示例代码如下 .. code:: go package main import ( "fmt" "math/rand" "strconv" "sync" "time" ) func main() { rand.Seed(time.Now().UnixNano()) const Max = 100000 const NumReceivers = 10 const NumSenders = 1000 wg := sync.WaitGroup{} wg.Add(NumReceivers) // 1. 业务通道 dataCh := make(chan int) // 2. 管理通道:必须是无缓冲通道 // 其发送者是:额外启动的管理协程 // 其接收者是:所有业务通道的发送者。 stopCh := make(chan struct{}) // 3. 媒介通道:必须是缓冲通道 // 其发送者是:业务通道的所有发送者和接收者 // 其接收者是:媒介协程(唯一) toStop := make(chan string, 1) var stoppedBy string // 媒介协程 go func() { stoppedBy = <-toStop close(stopCh) }() // 业务通道发送者 for i := 0; i < NumSenders; i++ { go func(id string) { for { // 提前检查管理通道是否关闭 // 让业务通道发送者早尽量退出 select { case <- stopCh: return default: } value := rand.Intn(Max) select { case <-stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } // 业务通道的接收者 for i := 0; i < NumReceivers; i++ { go func(id string) { defer wg.Done() for { // 提前检查管理通道是否关闭 // 让业务通道接收者早尽量退出 select { case <- stopCh: return default: } select { case <- stopCh: return case value := <-dataCh: // 一旦满足某个条件,就通过媒介通道发消息给媒介协程 // 以关闭管理通道的形式,广播给所有业务通道的协程退出 if value == 6666 { // 务必使用 select,两个目的: // 1、防止协程阻塞 // 2、防止向已关闭的通道发送数据导致panic select { case toStop <- "接收者#" + id: default: } return } } } }(strconv.Itoa(i)) } wg.Wait() fmt.Println("被" + stoppedBy + "终止了") } 无论是直接运行,还是加 ``-race`` 都没有问题了。 .. image:: http://image.iswbm.com/20220110215204.png 可能会有的人会有疑问,为什么最后一个例子里,业务通道没有关闭呢? 我觉得有必要讲一下这个,不然有的朋友可能会绕不出来。 我们的最终目的其实不是关闭业务通道,而是让业务通道相关的协程能够正常退出。 业务通道其实并不都要去关闭它,多关闭一个就多一分风险,何必呢? 一旦所有的协程正常退出了,Go 的垃圾回收自然会清理掉,这样是不是更省事呢? 3. 总结一下 ----------- 通道是 Go 语言的一等公民,用 Go 写应用几乎都绕不开通道,但在多协程并发的面前,人的逻辑会显得非常脆弱,当你正得意于自己写的代码跑得非常顺利的时候,\ **通道** 经常会给你带来意料不到的惊喜,而这种问题如果在一开始程序设计时没有注意到,等到问题出现后,可能会面临代码结构的重新设计。 本文是我基于 《\ `如何优雅地关闭 channel? `__\ 》这篇文章,经过提炼总结出一套认为比较好理解的通道编程模型。 文中我造了一些新词,诸如 **业务通道**\ 、\ **管理通道**\ 、\ **媒介通道**\ 、\ **媒介协程**\ ,仅是为个人理解方便而造出的新概念,这里需要说明一下,希望不会对你给你的学习造成困扰。 这里再次总结一下,这个 **万能的通道编程模型**\ 。 1. 当只有一个发送者时,无论有多少接收者,业务通道都应由唯一发送者关闭。 2. 当有多个发送者,一个接收者时,应借助管理通道,由业务通道唯一接收者充当管理通道的发送者,其他业务通道的发送者充当接收者 3. 当有多个发送者,多个接收者时,这是最复杂的,不仅要管理通道,还要另起一个专门的媒介协程,新增一个媒介通道,但核心逻辑都是一样。