4.10 学习 Go 协程:万能的通道模型(公式)¶
通道,是 Go 语言的一大特色,初次接触 Go 语言时,着实被他给惊艳到了,它让协程之间的通信变得非常的简单。
简单,意味着灵活,按理说,这是好事啊,可你要知道灵活的另一个潜台词,就是不标准,不同的人,使用的方式可能都不一样。
对于 通道 来说,我们要极力避免下面两种情况:
对一个已关闭的通道,进行关闭
对一个已关闭的通道,写入数据
因此,这两种操作,都会异常程序触发 panic。
然而,Go 语言并没有提供一个内置的函数来判断一个通道是否关闭,这肯定不是技术上的限制,应该另有原因。
本篇文章是我基于《如何优雅地关闭 channel?》这篇文章,再进行总结而得出来一套万能的通道编程模型,非常建议大家全文精读后收为已用。
1. 有隐患且不优雅的做法¶
对此,可能有的人会自己写个函数来判断函数是否关闭
比如,针对第一种情况,可以包装个 SafeClose 函数,用 defer recover 关闭通道可能引起的 panic
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
// 一个函数的返回结果可以在defer调用中修改。
justClosed = false
}
}()
// 假设ch != nil。
close(ch) // 如果ch已关闭,则产生一个恐慌。
return true // <=> justClosed = true; return
}
而对于第二种情况,同样可以包装个 SafeSend 函数,用 defer recover 给通道发送数据可能引起的 panic
func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
closed = true
}
}()
ch <- value // 如果ch已关闭,则产生一个恐慌。
return false // <=> closed = false; return
}
上面这两种方法,虽然可以在一定程度上,可以解决通道操作的 panic 问题,但仍然存在隐患,比如下面这个例子,
package main
func SafeSend(ch chan bool, value bool) (closed bool) {
defer func() {
if recover() != nil {
closed = true
}
}()
ch <- value // 如果ch已关闭,则产生一个恐慌。
return false // <=> closed = false; return
}
func SafeClose(ch chan bool) (justClosed bool) {
defer func() {
if recover() != nil {
// 一个函数的返回结果可以在defer调用中修改。
justClosed = false
}
}()
// 假设ch != nil。
close(ch) // 如果ch已关闭,则产生一个恐慌。
return true // <=> justClosed = true; return
}
func main() {
c := make(chan bool)
go func() {
for i := 0; i < 25; i++ {
SafeSend(c, true)
}
}()
SafeClose(c)
}
直接 go run
是没有问题的,但你要是加个 -race
,Go
编译器就会替你排查数据竞争的隐患
2. 万能的通道编程模型¶
重新梳理一下思路,可以发现:
(发送者)对一个已关闭的通道,写入数据 ❌
(接收者)对一个已关闭的通道,读取数据 ✅
那么如果能保证发送者本身知道通道是关闭的,它就不会再傻傻地往一个已关闭的通道发送数据了。
一个发送者,N 个接收者¶
关键问题是,发送者如何知道呢?
Go 语言本身没有提供类似的函数,同时上一节,咱们也探讨了,使用 recover panic 的方式封装函数,同样会有数据竞争的问题。
语言层面不可行,那么就由开发者约定协议。
通道应当由唯一发送者关闭
若没有唯一发送者,则需要加“管理角色”的通道
第一点很好理解:当只有一个发送者时,他自己本身肯定是知道通道是否关闭,就不用再判断是否关闭了,自己想关闭就关闭,完全没事。
可要是没有唯一发送者呢?
这又要分两种情况了。
多个发送者,一个接收者
多个发送者,多个接收者
无论哪种场景,都会有数据竞争的问题。
上面我也说了,对于没有唯一发送者的方案就是加一个 “管理角色” 的通道。
为了方便解释,我将通道分为两种:
业务通道:承载数据,用于多个协程间共享数据
管理通道:仅为了标记业务通道是否关闭而存在
因此管理通道需要满足两个条件:
第一个条件:具备广播功能
那只能是无缓冲通道(关闭后,所有 read 该通道的所有协程,都能明确的知道该通道已关闭)。
当该管理通道关闭了,说明业务通道也关闭了。
当该管理通道阻塞了,说明业务通道还没关闭。
第二个条件:有唯一发送者
这个开发者非常容易实现:
对于多个发送者,一个接收者的场景,业务通道的这个接收者,就可以充当管理通道的 唯一发送者
对于多个发送者,多个接收者的场景,就需要再单独开启一个媒介协程做 唯一发送者
针对这两个场景,这边分别举个例子
N个发送者,一个接收者¶
首先是多个发送者,一个接收者
package main
import (
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumSenders = 1000
wg := sync.WaitGroup{}
wg.Add(1)
// 业务通道
dataCh := make(chan int)
// 管理通道:必须是无缓冲通道
// 其发送者是 业务通道的接收者。
// 其接收者是 业务通道的发送者。
stopCh := make(chan struct{})
// 业务通道的发送者
for i := 0; i < NumSenders; i++ {
go func() {
for {
// 提前检查管理通道是否关闭
// 让业务通道发送者早尽量退出
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// 业务通道的接收者,亦充当管理通道的发送者
go func() {
defer wg.Done()
for value := range dataCh {
if value == 6666 {
// 当达到某个条件时
// 通过关闭管理通道来广播给所有业务通道的发送者
close(stopCh)
return
}
}
}()
wg.Wait()
}
N个发送者,N个接收者¶
然后是多个发送者,多个接收者,这个场景需要另外开启一个媒介协程。
媒介协程的作用,很明显啊,就是充当媒介,媒介要有自己的一个媒介通道:
其发送者是:业务通道的所有发送者和接收者
其接收者是:媒介协程(是唯一的)
既然媒介协程只有一个,那自然而然地,媒介协程做为管理通道的 唯一发送者,再合适不过了。
还有一个非常重要的点是,媒介协程要是媒介通道的接收者,因此它要先于业务通道的所有发送者、接收者启动。
这就要求,媒介通道,必须是缓冲通道,长度可以取 1 即可。
完整的示例代码如下
package main
import (
"fmt"
"math/rand"
"strconv"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wg := sync.WaitGroup{}
wg.Add(NumReceivers)
// 1. 业务通道
dataCh := make(chan int)
// 2. 管理通道:必须是无缓冲通道
// 其发送者是:额外启动的管理协程
// 其接收者是:所有业务通道的发送者。
stopCh := make(chan struct{})
// 3. 媒介通道:必须是缓冲通道
// 其发送者是:业务通道的所有发送者和接收者
// 其接收者是:媒介协程(唯一)
toStop := make(chan string, 1)
var stoppedBy string
// 媒介协程
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// 业务通道发送者
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
// 提前检查管理通道是否关闭
// 让业务通道发送者早尽量退出
select {
case <- stopCh:
return
default:
}
value := rand.Intn(Max)
select {
case <-stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// 业务通道的接收者
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wg.Done()
for {
// 提前检查管理通道是否关闭
// 让业务通道接收者早尽量退出
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case value := <-dataCh:
// 一旦满足某个条件,就通过媒介通道发消息给媒介协程
// 以关闭管理通道的形式,广播给所有业务通道的协程退出
if value == 6666 {
// 务必使用 select,两个目的:
// 1、防止协程阻塞
// 2、防止向已关闭的通道发送数据导致panic
select {
case toStop <- "接收者#" + id:
default:
}
return
}
}
}
}(strconv.Itoa(i))
}
wg.Wait()
fmt.Println("被" + stoppedBy + "终止了")
}
无论是直接运行,还是加 -race
都没有问题了。
可能会有的人会有疑问,为什么最后一个例子里,业务通道没有关闭呢?
我觉得有必要讲一下这个,不然有的朋友可能会绕不出来。
我们的最终目的其实不是关闭业务通道,而是让业务通道相关的协程能够正常退出。
业务通道其实并不都要去关闭它,多关闭一个就多一分风险,何必呢?
一旦所有的协程正常退出了,Go 的垃圾回收自然会清理掉,这样是不是更省事呢?
3. 总结一下¶
通道是 Go 语言的一等公民,用 Go 写应用几乎都绕不开通道,但在多协程并发的面前,人的逻辑会显得非常脆弱,当你正得意于自己写的代码跑得非常顺利的时候,通道 经常会给你带来意料不到的惊喜,而这种问题如果在一开始程序设计时没有注意到,等到问题出现后,可能会面临代码结构的重新设计。
本文是我基于 《如何优雅地关闭 channel?》这篇文章,经过提炼总结出一套认为比较好理解的通道编程模型。
文中我造了一些新词,诸如 业务通道、管理通道、媒介通道、媒介协程,仅是为个人理解方便而造出的新概念,这里需要说明一下,希望不会对你给你的学习造成困扰。
这里再次总结一下,这个 万能的通道编程模型。
当只有一个发送者时,无论有多少接收者,业务通道都应由唯一发送者关闭。
当有多个发送者,一个接收者时,应借助管理通道,由业务通道唯一接收者充当管理通道的发送者,其他业务通道的发送者充当接收者
当有多个发送者,多个接收者时,这是最复杂的,不仅要管理通道,还要另起一个专门的媒介协程,新增一个媒介通道,但核心逻辑都是一样。