dopr25398 2017-08-03 16:41
浏览 64
已采纳

并行执行与顺序执行之间的闭合不一致

I have attempted to write a generic function that can execute functions in parallel or sequentially. While testing it, I have found some very unexpected behavior regarding closures. In the code below, I define a list of functions that accept no parameters and return an error. The functions also use a for loop variable in a closure but I'm using the trick of defining a new variable within the loop in an attempt to avoid capture.

I'm expecting that I can call these functions sequentially or concurrently with the same effect but I'm seeing different results. It's as if the closure variable is being captured but only when run concurrently.

As far as I can tell, this is not the usual case of capturing a loop variable. As I mentioned, I'm defining a new variable within the loop. Also, I'm not running the closure function within the loop. I'm generating a list of functions within the loop but I'm executing the functions after the loop.

I'm using go version go1.8.3 linux/amd64.

package closure_test

import (
    "sync"
    "testing"
)

// MergeErrors merges multiple channels of errors.
// Based on https://blog.golang.org/pipelines.
func MergeErrors(cs ...<-chan error) <-chan error {
    var wg sync.WaitGroup
    out := make(chan error)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan error) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

// WaitForPipeline waits for results from all error channels.
// It returns early on the first error.
func WaitForPipeline(errs ...<-chan error) error {
    errc := MergeErrors(errs...)
    for err := range errc {
        if err != nil {
            return err
        }
    }
    return nil
}

func RunInParallel(funcs ...func() error) error {
    var errcList [](<-chan error)
    for _, f := range funcs {
        errc := make(chan error, 1)
        errcList = append(errcList, errc)
        go func() {
            err := f()
            if err != nil {
                errc <- err
            }
            close(errc)
        }()
    }
    return WaitForPipeline(errcList...)
}

func RunSequentially(funcs ...func() error) error {
    for _, f := range funcs {
        err := f()
        if err != nil {
            return err
        }
    }
    return nil
}

func validateOutputChannel(t *testing.T, out chan int, n int) {
    m := map[int]bool{}
    for i := 0; i < n; i++ {
        m[<-out] = true
    }
    if len(m) != n {
        t.Errorf("Output channel has %v unique items; wanted %v", len(m), n)
    }
}

// This fails because j is being captured.
func TestClosure1sp(t *testing.T) {
    n := 4
    out := make(chan int, n*2)
    var funcs [](func() error)
    for i := 0; i < n; i++ {
        j := i // define a new variable that has scope only inside the current loop iteration
        t.Logf("outer i=%v, j=%v", i, j)
        f := func() error {
            t.Logf("inner i=%v, j=%v", i, j)
            out <- j
            return nil
        }
        funcs = append(funcs, f)
    }
    t.Logf("Running funcs sequentially")
    if err := RunSequentially(funcs...); err != nil {
        t.Fatal(err)
    }
    validateOutputChannel(t, out, n)
    t.Logf("Running funcs in parallel")
    if err := RunInParallel(funcs...); err != nil {
        t.Fatal(err)
    }
    close(out)
    validateOutputChannel(t, out, n)
}

Below is the output from the test function above.

closure_test.go:91: outer i=0, j=0
closure_test.go:91: outer i=1, j=1
closure_test.go:91: outer i=2, j=2
closure_test.go:91: outer i=3, j=3
closure_test.go:99: Running funcs sequentially
closure_test.go:93: inner i=4, j=0
closure_test.go:93: inner i=4, j=1
closure_test.go:93: inner i=4, j=2
closure_test.go:93: inner i=4, j=3
closure_test.go:104: Running funcs in parallel
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:80: Output channel has 1 unique items; wanted 4

Any ideas? Is this a bug in Go?

  • 写回答

2条回答 默认 最新

  • dppxp79175 2017-08-03 16:52
    关注

    Always run your tests with -race. In your case, you forgot to recreate f on each iteration in RunInParallel:

    func RunInParallel(funcs ...func() error) error {
        var errcList [](<-chan error)
        for _, f := range funcs {
    
            f := f // << HERE
    
            errc := make(chan error, 1)
            errcList = append(errcList, errc)
            go func() {
                err := f()
                if err != nil {
                    errc <- err
                }
                close(errc)
            }()
        }
        return WaitForPipeline(errcList...)
    }
    

    As a result, you always launched the last f instead of each one.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 LiBeAs的带隙等于0.997eV,计算阴离子的N和P
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘
  • ¥15 来真人,不要ai!matlab有关常微分方程的问题求解决,
  • ¥15 perl MISA分析p3_in脚本出错
  • ¥15 k8s部署jupyterlab,jupyterlab保存不了文件
  • ¥15 ubuntu虚拟机打包apk错误
  • ¥199 rust编程架构设计的方案 有偿
  • ¥15 回答4f系统的像差计算
  • ¥15 java如何提取出pdf里的文字?
  • ¥100 求三轴之间相互配合画圆以及直线的算法