您正在执行的是扇出模式,也就是说,多个端点正在侦听单个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获取消息。唯一的例外是
close渠道。
close所有的听众都将认识到这一点,因此是“广播”。
但是您想要做的是广播从连接读取的消息,因此我们可以执行以下操作:
何时知道听众人数
让每个工作人员收听专用广播频道,并将消息从主频道分发到每个专用广播频道。
type worker struct { source chan interface{} quit chan struct{}}func (w *worker) Start() { w.source = make(chan interface{}, 10) // some buffer size to avoid blocking go func() { for { select { case msg := <-w.source // do something with msg case <-quit: // will explain this in the last section return } } }()}然后我们可能会有很多工人:
workers := []*worker{&worker{}, &worker{}}for _, worker := range workers { worker.Start() }然后启动我们的监听器:
go func() {for { conn, _ := listener.Accept() ch <- conn }}()和调度员:
go func() { for { msg := <- ch for _, worker := workers { worker.source <- msg } }}()未知的听众人数
在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,每当需要新工作人员时,都需要创建一个新工作人员,将其启动,然后将其推入
workers切片。但是此方法需要一个线程安全的切片,该切片周围需要一个锁。一种实现可能如下所示:
type threadSafeSlice struct { sync.Mutex workers []*worker}func (slice *threadSafeSlice) Push(w *worker) { slice.Lock() defer slice.Unlock() workers = append(workers, w)}func (slice *threadSafeSlice) Iter(routine func(*worker)) { slice.Lock() defer slice.Unlock() for _, worker := range workers { routine(worker) }}每当您想开始工作时:
w := &worker{}w.Start()threadSafeSlice.Push(w)您的调度员将更改为:
go func() { for { msg := <- ch threadSafeSlice.Iter(func(w *worker) { w.source <- msg }) }}()遗言:永远不要离开悬空的goroutine
好的做法之一是:永远不要离开悬空的goroutine。因此,当您听完后,需要关闭所有触发的goroutine。这将通过以下
quit渠道进行
worker:
首先,我们需要创建一个全局
quit信令通道:
globalQuit := make(chan struct{})每当我们创建一个worker时,我们都会为其分配
globalQuit通道作为其退出信号:
worker.quit = globalQuit
然后,当我们要关闭所有工作程序时,我们只需执行以下操作:
close(globalQuit)
由于
close所有侦听的goroutine都可以识别(这是您理解的重点),因此将返回所有goroutine。记住也要关闭调度程序,但我会留给您:)



