您可以使用
context为此类事情创建的软件包(
“带有截止日期,取消信号…” )。
您创建了一个能够发布带有的取消信号的
context.WithCancel()上下文(父上下文可能是所返回的上下文
context.Background())。这将为您返回一个
cancel()函数,该函数可用于取消(或更准确地
发出 取消意图)给辅助goroutines。
并且在worker
goroutine中,您必须通过检查返回的通道
Context.Done()是否关闭来检查是否已经启动了该意图,这是通过尝试从其接收消息(如果关闭该通道将立即进行)最简单的方法。并执行非阻塞检查(因此如果未关闭,则可以继续执行),将
select语句与
default分支一起使用。
我将使用以下
work()实现,该实现模拟10%的失败机会,并模拟1秒的工作:
func work(i int) (int, error) { if rand.Intn(100) < 10 { // 10% of failure return 0, errors.New("random error") } time.Sleep(time.Second) return 100 + i, nil}和
doAllWork()可能看起来像这样:
func doAllWork() error { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors for i := 0; i < 2; i++ { wg.Add(1) go func(i int) { defer wg.Done() for j := 0; j < 10; j++ { // Check if any error occurred in any other gorouties: select { case <-ctx.Done(): return // Error somewhere, terminate default: // Default is must to avoid blocking } result, err := work(j) if err != nil { fmt.Printf("Worker #%d during %d, error: %vn", i, j, err) cancel() return } fmt.Printf("Worker #%d finished %d, result: %d.n", i, j, result) } }(i) } wg.Wait() return ctx.Err()}这是可以如何测试的方法:
func main() { rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed fmt.Printf("doAllWork: %vn", doAllWork())}输出(在Go Playground上尝试):
Worker #0 finished 0, result: 100.Worker #1 finished 0, result: 100.Worker #1 finished 1, result: 101.Worker #0 finished 1, result: 101.Worker #0 finished 2, result: 102.Worker #1 finished 2, result: 102.Worker #1 finished 3, result: 103.Worker #1 during 4, error: random errorWorker #0 finished 3, result: 103.doAllWork: context canceled
如果没有错误,例如使用以下
work()功能时:
func work(i int) (int, error) { time.Sleep(time.Second) return 100 + i, nil}输出就像(在Go Playground上尝试):
Worker #0 finished 0, result: 100.Worker #1 finished 0, result: 100.Worker #1 finished 1, result: 101.Worker #0 finished 1, result: 101.Worker #0 finished 2, result: 102.Worker #1 finished 2, result: 102.Worker #1 finished 3, result: 103.Worker #0 finished 3, result: 103.Worker #0 finished 4, result: 104.Worker #1 finished 4, result: 104.Worker #1 finished 5, result: 105.Worker #0 finished 5, result: 105.Worker #0 finished 6, result: 106.Worker #1 finished 6, result: 106.Worker #1 finished 7, result: 107.Worker #0 finished 7, result: 107.Worker #0 finished 8, result: 108.Worker #1 finished 8, result: 108.Worker #1 finished 9, result: 109.Worker #0 finished 9, result: 109.doAllWork: <nil>
笔记:
基本上,我们只是使用了
Done()上下文的通道,因此似乎可以轻松地(如果不是更简单)使用
done通道而不是
Context,关闭通道即可完成
cancel()上述解决方案中的工作。
这不是真的。 仅当只有一个goroutine可以关闭通道时才可以使用此方法,但是在我们的情况下,任何工人都可以这样做。
并尝试关闭已经关闭的通道恐慌。因此,您必须确保围绕进行某种同步/排除
close(done),这会使它的可读性降低,甚至变得更加复杂。实际上,这恰恰是
cancel()函数在后台执行的功能
cancel(),使您的代码隐藏/抽象,因此可以多次调用它,以使您的代码/使用起来更简单。
如何从工人那里获取和返还错误?
为此,您可以使用错误通道:
errs := make(chan error, 2) // Buffer for 2 errors
并且在工作人员内部遇到错误时,请在通道上发送该错误,而不是打印该错误:
result, err := work(j)if err != nil { errs <- fmt.Errorf("Worker #%d during %d, error: %vn", i, j, err) cancel() return}在循环之后,如果有错误,请返回该错误(
nil否则):
// Return (first) error, if any:if ctx.Err() != nil { return <-errs}return nil这次输出(在Go Playground上尝试):
Worker #0 finished 0, result: 100.Worker #1 finished 0, result: 100.Worker #1 finished 1, result: 101.Worker #0 finished 1, result: 101.Worker #0 finished 2, result: 102.Worker #1 finished 2, result: 102.Worker #1 finished 3, result: 103.Worker #0 finished 3, result: 103.doAllWork: Worker #1 during 4, error: random error
请注意,我使用的缓冲区通道的缓冲区大小等于工作线程数,以确保在该通道上进行发送始终是非阻塞的。这也使您可以接收和处理所有错误,而不仅仅是一个错误(例如第一个错误)。另一种选择是使用缓冲通道仅保留1,并在其上进行非阻塞发送,如下所示:
errs := make(chan error, 1) // Buffered only for the first error// ...and inside the worker:result, err := work(j)if err != nil { // Non-blocking send: select { case errs <- fmt.Errorf("Worker #%d during %d, error: %vn", i, j, err): default: } cancel() return}


