4.10 学习 Go 协程:万能的通道模型(公式)

http://image.iswbm.com/20200607145423.png

通道,是 Go 语言的一大特色,初次接触 Go 语言时,着实被他给惊艳到了,它让协程之间的通信变得非常的简单。

简单,意味着灵活,按理说,这是好事啊,可你要知道灵活的另一个潜台词,就是不标准,不同的人,使用的方式可能都不一样。

对于 通道 来说,我们要极力避免下面两种情况:

  1. 对一个已关闭的通道,进行关闭

  2. 对一个已关闭的通道,写入数据

因此,这两种操作,都会异常程序触发 panic。

然而,Go 语言并没有提供一个内置的函数来判断一个通道是否关闭,这肯定不是技术上的限制,应该另有原因。

本篇文章是我基于《如何优雅地关闭 channel?》这篇文章,再进行总结而得出来一套万能的通道编程模型,非常建议大家全文精读后收为已用。

1. 有隐患且不优雅的做法

对此,可能有的人会自己写个函数来判断函数是否关闭

比如,针对第一种情况,可以包装个 SafeClose 函数,用 defer recover 关闭通道可能引起的 panic

func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
        if recover() != nil {
            // 一个函数的返回结果可以在defer调用中修改。
            justClosed = false
        }
    }()

    // 假设ch != nil。
    close(ch)   // 如果ch已关闭,则产生一个恐慌。
    return true // <=> justClosed = true; return
}

而对于第二种情况,同样可以包装个 SafeSend 函数,用 defer recover 给通道发送数据可能引起的 panic

func SafeSend(ch chan T, value T) (closed bool) {
    defer func() {
        if recover() != nil {
            closed = true
        }
    }()

    ch <- value  // 如果ch已关闭,则产生一个恐慌。
    return false // <=> closed = false; return
}

上面这两种方法,虽然可以在一定程度上,可以解决通道操作的 panic 问题,但仍然存在隐患,比如下面这个例子,

package main

func SafeSend(ch chan bool, value bool) (closed bool) {
    defer func() {
        if recover() != nil {
            closed = true
        }
    }()

    ch <- value  // 如果ch已关闭,则产生一个恐慌。
    return false // <=> closed = false; return
}

func SafeClose(ch chan bool) (justClosed bool) {
    defer func() {
        if recover() != nil {
            // 一个函数的返回结果可以在defer调用中修改。
            justClosed = false
        }
    }()

    // 假设ch != nil。
    close(ch)   // 如果ch已关闭,则产生一个恐慌。
    return true // <=> justClosed = true; return
}

func main() {
    c := make(chan bool)
    go func() {
        for i := 0; i < 25; i++ {
            SafeSend(c, true)
        }
    }()
    SafeClose(c)
}

直接 go run 是没有问题的,但你要是加个 -race ,Go 编译器就会替你排查数据竞争的隐患

http://image.iswbm.com/20220109225820.png

2. 万能的通道编程模型

重新梳理一下思路,可以发现:

  1. (发送者)对一个已关闭的通道,写入数据 ❌

  2. (接收者)对一个已关闭的通道,读取数据 ✅

那么如果能保证发送者本身知道通道是关闭的,它就不会再傻傻地往一个已关闭的通道发送数据了。

一个发送者,N 个接收者

关键问题是,发送者如何知道呢?

Go 语言本身没有提供类似的函数,同时上一节,咱们也探讨了,使用 recover panic 的方式封装函数,同样会有数据竞争的问题。

语言层面不可行,那么就由开发者约定协议。

  • 通道应当由唯一发送者关闭

  • 若没有唯一发送者,则需要加“管理角色”的通道

第一点很好理解:当只有一个发送者时,他自己本身肯定是知道通道是否关闭,就不用再判断是否关闭了,自己想关闭就关闭,完全没事。

可要是没有唯一发送者呢?

这又要分两种情况了。

  1. 多个发送者,一个接收者

  2. 多个发送者,多个接收者

无论哪种场景,都会有数据竞争的问题。

上面我也说了,对于没有唯一发送者的方案就是加一个 “管理角色” 的通道。

为了方便解释,我将通道分为两种:

  1. 业务通道:承载数据,用于多个协程间共享数据

  2. 管理通道:仅为了标记业务通道是否关闭而存在

因此管理通道需要满足两个条件:

第一个条件:具备广播功能

那只能是无缓冲通道(关闭后,所有 read 该通道的所有协程,都能明确的知道该通道已关闭)。

  • 当该管理通道关闭了,说明业务通道也关闭了。

  • 当该管理通道阻塞了,说明业务通道还没关闭。

第二个条件:有唯一发送者

这个开发者非常容易实现:

  • 对于多个发送者,一个接收者的场景,业务通道的这个接收者,就可以充当管理通道的 唯一发送者

  • 对于多个发送者,多个接收者的场景,就需要再单独开启一个媒介协程做 唯一发送者

针对这两个场景,这边分别举个例子

N个发送者,一个接收者

首先是多个发送者,一个接收者

package main

import (
    "math/rand"
    "sync"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    const Max = 100000
    const NumSenders = 1000

    wg := sync.WaitGroup{}
    wg.Add(1)

    // 业务通道
    dataCh := make(chan int)

    // 管理通道:必须是无缓冲通道
    // 其发送者是 业务通道的接收者。
    // 其接收者是 业务通道的发送者。
    stopCh := make(chan struct{})

    // 业务通道的发送者
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                // 提前检查管理通道是否关闭
                // 让业务通道发送者早尽量退出
                select {
                case <- stopCh:
                    return
                default:
                }

                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(Max):
                }
            }
        }()
    }

    // 业务通道的接收者,亦充当管理通道的发送者
    go func() {
        defer wg.Done()

        for value := range dataCh {
            if value == 6666 {
                // 当达到某个条件时
                // 通过关闭管理通道来广播给所有业务通道的发送者
                close(stopCh)
                return
            }
        }
    }()

    wg.Wait()
}

N个发送者,N个接收者

然后是多个发送者,多个接收者,这个场景需要另外开启一个媒介协程。

媒介协程的作用,很明显啊,就是充当媒介,媒介要有自己的一个媒介通道:

  • 其发送者是:业务通道的所有发送者和接收者

  • 其接收者是:媒介协程(是唯一的)

既然媒介协程只有一个,那自然而然地,媒介协程做为管理通道的 唯一发送者,再合适不过了。

还有一个非常重要的点是,媒介协程要是媒介通道的接收者,因此它要先于业务通道的所有发送者、接收者启动。

这就要求,媒介通道,必须是缓冲通道,长度可以取 1 即可。

完整的示例代码如下

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "sync"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    const Max = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    wg := sync.WaitGroup{}
    wg.Add(NumReceivers)

    // 1. 业务通道
    dataCh := make(chan int)

    // 2. 管理通道:必须是无缓冲通道
    // 其发送者是:额外启动的管理协程
    // 其接收者是:所有业务通道的发送者。
    stopCh := make(chan struct{})

    // 3. 媒介通道:必须是缓冲通道
    // 其发送者是:业务通道的所有发送者和接收者
    // 其接收者是:媒介协程(唯一)
    toStop := make(chan string, 1)

    var stoppedBy string

    // 媒介协程
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()

    // 业务通道发送者
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                // 提前检查管理通道是否关闭
                // 让业务通道发送者早尽量退出
                select {
                case <- stopCh:
                    return
                default:
                }

                value := rand.Intn(Max)
                select {
                case <-stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // 业务通道的接收者
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wg.Done()

            for {
                // 提前检查管理通道是否关闭
                // 让业务通道接收者早尽量退出
                select {
                case <- stopCh:
                    return
                default:
                }

                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    // 一旦满足某个条件,就通过媒介通道发消息给媒介协程
                    // 以关闭管理通道的形式,广播给所有业务通道的协程退出
                    if value == 6666 {
                        // 务必使用 select,两个目的:
                        // 1、防止协程阻塞
                        // 2、防止向已关闭的通道发送数据导致panic
                        select {
                        case toStop <- "接收者#" + id:
                        default:
                        }
                        return
                    }

                }
            }
        }(strconv.Itoa(i))
    }

    wg.Wait()
    fmt.Println("被" + stoppedBy + "终止了")
}

无论是直接运行,还是加 -race 都没有问题了。

http://image.iswbm.com/20220110215204.png

可能会有的人会有疑问,为什么最后一个例子里,业务通道没有关闭呢?

我觉得有必要讲一下这个,不然有的朋友可能会绕不出来。

我们的最终目的其实不是关闭业务通道,而是让业务通道相关的协程能够正常退出。

业务通道其实并不都要去关闭它,多关闭一个就多一分风险,何必呢?

一旦所有的协程正常退出了,Go 的垃圾回收自然会清理掉,这样是不是更省事呢?

3. 总结一下

通道是 Go 语言的一等公民,用 Go 写应用几乎都绕不开通道,但在多协程并发的面前,人的逻辑会显得非常脆弱,当你正得意于自己写的代码跑得非常顺利的时候,通道 经常会给你带来意料不到的惊喜,而这种问题如果在一开始程序设计时没有注意到,等到问题出现后,可能会面临代码结构的重新设计。

本文是我基于 《如何优雅地关闭 channel?》这篇文章,经过提炼总结出一套认为比较好理解的通道编程模型。

文中我造了一些新词,诸如 业务通道管理通道媒介通道媒介协程,仅是为个人理解方便而造出的新概念,这里需要说明一下,希望不会对你给你的学习造成困扰。

这里再次总结一下,这个 万能的通道编程模型

  1. 当只有一个发送者时,无论有多少接收者,业务通道都应由唯一发送者关闭。

  2. 当有多个发送者,一个接收者时,应借助管理通道,由业务通道唯一接收者充当管理通道的发送者,其他业务通道的发送者充当接收者

  3. 当有多个发送者,多个接收者时,这是最复杂的,不仅要管理通道,还要另起一个专门的媒介协程,新增一个媒介通道,但核心逻辑都是一样。