dtmm0148603 2017-08-04 07:37
浏览 71
已采纳

如果一次执行中发生错误,则关闭多个goroutine

consider this function :

func doAllWork() error {
    var wg sync.WaitGroup
    wg.Add(3)
    for i := 0; i < 2; i++ {
        go func() {
            defer wg.Done()
            for j := 0; j < 10; j++ {
                result, err := work(j)
                if err != nil {
                    // can't use `return err` here
                    // what sould I put instead ? 
                    os.Exit(0)
                }
            }
        }()
    }
    wg.Wait()
    return nil
}

In each goroutine, the function work() is called 10 times. If one call to work() returns an error in any of the running goroutines, I want all the goroutines to stop immediately, and the program to exit. Is it ok to use os.Exit() here ? How should I handle this ?


Edit: this question is different from how to stop a goroutine as here I need to close all goroutines if an error occurs in one

  • 写回答

1条回答 默认 最新

  • dongluolie3487 2017-08-04 09:04
    关注

    You may use the context package which was created for things like this ("carries deadlines, cancelation signals...").

    You create a context capable of publishing cancelation signals with context.WithCancel() (parent context may be the one returned by context.Background()). This will return you a cancel() function which may be used to cancel (or more precisely signal the cancel intent) to the worker goroutines.
    And in the worker goroutines you have to check if such intent has been initiated, by checking if the channel returned by Context.Done() is closed, easiest done by attempting to receive from it (which proceeds immediately if it is closed). And to do a non-blocking check (so you can continue if it is not closed), use the select statement with a default branch.

    I will use the following work() implementation, which simulates a 10% failure chance, and simulates 1 second of work:

    func work(i int) (int, error) {
        if rand.Intn(100) < 10 { // 10% of failure
            return 0, errors.New("random error")
        }
        time.Sleep(time.Second)
        return 100 + i, nil
    }
    

    And the doAllWork() may look like this:

    func doAllWork() error {
        var wg sync.WaitGroup
    
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel() // Make sure it's called to release resources even if no errors
    
        for i := 0; i < 2; i++ {
            wg.Add(1)
            go func(i int) {
                defer wg.Done()
    
                for j := 0; j < 10; j++ {
                    // Check if any error occurred in any other gorouties:
                    select {
                    case <-ctx.Done():
                        return // Error somewhere, terminate
                    default: // Default is must to avoid blocking
                    }
                    result, err := work(j)
                    if err != nil {
                        fmt.Printf("Worker #%d during %d, error: %v
    ", i, j, err)
                        cancel()
                        return
                    }
                    fmt.Printf("Worker #%d finished %d, result: %d.
    ", i, j, result)
                }
            }(i)
        }
        wg.Wait()
    
        return ctx.Err()
    }
    

    This is how it can be tested:

    func main() {
        rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
        fmt.Printf("doAllWork: %v
    ", doAllWork())
    }
    

    Output (try it on the Go Playground):

    Worker #0 finished 0, result: 100.
    Worker #1 finished 0, result: 100.
    Worker #1 finished 1, result: 101.
    Worker #0 finished 1, result: 101.
    Worker #0 finished 2, result: 102.
    Worker #1 finished 2, result: 102.
    Worker #1 finished 3, result: 103.
    Worker #1 during 4, error: random error
    Worker #0 finished 3, result: 103.
    doAllWork: context canceled
    

    If there would be no errors, e.g. when using the following work() function:

    func work(i int) (int, error) {
        time.Sleep(time.Second)
        return 100 + i, nil
    }
    

    The output would be like (try it on the Go Playground):

    Worker #0 finished 0, result: 100.
    Worker #1 finished 0, result: 100.
    Worker #1 finished 1, result: 101.
    Worker #0 finished 1, result: 101.
    Worker #0 finished 2, result: 102.
    Worker #1 finished 2, result: 102.
    Worker #1 finished 3, result: 103.
    Worker #0 finished 3, result: 103.
    Worker #0 finished 4, result: 104.
    Worker #1 finished 4, result: 104.
    Worker #1 finished 5, result: 105.
    Worker #0 finished 5, result: 105.
    Worker #0 finished 6, result: 106.
    Worker #1 finished 6, result: 106.
    Worker #1 finished 7, result: 107.
    Worker #0 finished 7, result: 107.
    Worker #0 finished 8, result: 108.
    Worker #1 finished 8, result: 108.
    Worker #1 finished 9, result: 109.
    Worker #0 finished 9, result: 109.
    doAllWork: <nil>
    

    Notes:

    Basically we just used the Done() channel of the context, so it seems we could just as easily (if not even easier) use a done channel instead of the Context, closing the channel to do what cancel() does in the above solution.

    This is not true. This can only be used if only one goroutine may close the channel, but in our case any of the workers may do so. And attempting to close an already closed channel panics (see details here: How does a non initialized channel behave?). So you would have to ensure some kind of synchronization / exclusion around the close(done), which will make it less readable and even more complex. Actually this is exactly what the cancel() function does under the hood, hidden / abstracted away from your eyes, so cancel() may be called multiple times to make your code / use of it simpler.

    How to get and return the error(s) from the workers?

    For this you may use an error channel:

    errs := make(chan error, 2) // Buffer for 2 errors
    

    And inside the workers when an error is encountered, send it on the channel instead of printing it:

    result, err := work(j)
    if err != nil {
        errs <- fmt.Errorf("Worker #%d during %d, error: %v
    ", i, j, err)
        cancel()
        return
    }
    

    And after the loop, if there was an error, return that (and nil otherwise):

    // Return (first) error, if any:
    if ctx.Err() != nil {
        return <-errs
    }
    return nil
    

    Output this time (try this on the Go Playground):

    Worker #0 finished 0, result: 100.
    Worker #1 finished 0, result: 100.
    Worker #1 finished 1, result: 101.
    Worker #0 finished 1, result: 101.
    Worker #0 finished 2, result: 102.
    Worker #1 finished 2, result: 102.
    Worker #1 finished 3, result: 103.
    Worker #0 finished 3, result: 103.
    doAllWork: Worker #1 during 4, error: random error
    

    Note that I used a buffered channel with a buffer size equal to the number of workers, which ensures sending on it is always non-blocking. This also gives you the possibility to receive and process all errors, not just one (e.g. the first). Another option could be to use a buffered channel to hold only 1, and do a non-blocking send on it, which could look like this:

    errs := make(chan error, 1) // Buffered only for the first error
    
    // ...and inside the worker:
    
    result, err := work(j)
    if err != nil {
        // Non-blocking send:
        select {
        case errs <- fmt.Errorf("Worker #%d during %d, error: %v
    ", i, j, err):
        default:
        }
        cancel()
        return
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 等差数列中的素数个数
  • ¥50 寻找一位有逆向游戏盾sdk 应用程序经验的技术
  • ¥15 请问有用MZmine处理 “Waters SYNAPT G2-Si QTOF质谱仪在MSE模式下采集的非靶向数据” 的分析教程吗
  • ¥50 opencv4nodejs 如何安装
  • ¥15 adb push异常 adb: error: 1409-byte write failed: Invalid argument
  • ¥15 nginx反向代理获取ip,java获取真实ip
  • ¥15 eda:门禁系统设计
  • ¥50 如何使用js去调用vscode-js-debugger的方法去调试网页
  • ¥15 376.1电表主站通信协议下发指令全被否认问题
  • ¥15 物体双站RCS和其组成阵列后的双站RCS关系验证