douji1999 2015-03-26 01:14
浏览 14
已采纳

Go例行泄漏在哪里?

I'm trying to run several tasks concurrently and return immediately if there is any error without to wait for all of the routines to return. The code looks as below. I've stripped out the noise to make it easier to digest but I can post the full code if the leak is not obvious. It's worth to note that I'm deploying this on google app engine. I can't reproduce the leak on my machine but when I replace the concurrency after // Consume the results comment the app is working fine, though I don't understand why because the code looks correct to me.

package main

import "fmt"
import "sync"
import "errors"

func main() {
    indexes := []int{1, 2, 3, 4, 5, 6, 7}
    devCh := make(chan int, 7)
    stopCh := make(chan struct{})
    errCh := make(chan error, 7)
    var wg sync.WaitGroup
    go func() {
        for _, sub := range indexes {
            wg.Add(1)
            go func(sub int) {
                defer wg.Done()
                // some code which creates other
                // wait groups and spans other go routines
                // handle errors
                if sub == 99 { // unreachable 
                    errCh <- errors.New("new error")

                }
            }(sub)
            select {
            // If there is any error we better stop the
            // loop
            case <-stopCh:
                return
            default:
            }
            devCh <- sub
        }
        wg.Wait()
        close(devCh)
    }()
    // Consume the results
    var results []int
    var wt sync.WaitGroup
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devCh {
            results = append(results, s)
        }
        return
    }()
    done := make(chan struct{})
    go func() {
        wt.Wait()
        close(done)
    }()

L:
    for {
        select {
        case err := <-errCh:
            fmt.Printf("error was %v", err)
            close(stopCh)
            return
        case <-done:
            break L
        default:
        }
    }
    fmt.Printf("all done, %v", results)
}

Edit: added some working code.

Edit: added code closer to the real code which may explain the need of the for loop.

package main

import "fmt"
import "sync"
import "errors"

func main() {
    indexes := []int{1, 2, 3, 4, 5, 6, 7}
    indexesString := []string{"a", "b", "c", "d"}
    devChS := make(chan string, 1000)

    devCh := make(chan int, 7)
    stopCh := make(chan struct{})
    errCh := make(chan error, 7)
    var wg sync.WaitGroup
    go func() {
        for _, sub := range indexes {
            wg.Add(1)
            go func(sub int) {
                defer wg.Done()
                // some code which creates other
                // wait groups and spans other go routines
                // handle errors
                if sub == 99 { // unreachable
                    errCh <- errors.New("new error")

                }
                wg.Add(1)
                go func(sub int) {
                    defer wg.Done()
                    for _, s := range indexesString {
                        devChS <- fmt.Sprintf("%s %s", s, sub)

                    }

                    return
                }(sub)
            }(sub)
            select {
            // If there is any error we better stop the
            // loop
            case <-stopCh:
                return
            default:
            }
            devCh <- sub
        }
        wg.Wait()
        close(devCh)
        close(devChS)
    }()
    // Consume the results
    var results = struct {
        integers []int
        strings  []string
    }{}
    var wt sync.WaitGroup
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devCh {
            results.integers = append(results.integers, s)
        }
        return
    }()
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devChS {
            results.strings = append(results.strings, s)
        }
        return
    }()
    done := make(chan struct{})
    go func() {
        wt.Wait()
        close(done)
    }()

L:
    for {
        select {
        case err := <-errCh:
            fmt.Printf("error was %v", err)
            close(stopCh)
            return
        case <-done:
            break L
        default:
        }
    }
    fmt.Printf("all done, can return the results: %v", results)
}
  • 写回答

1条回答 默认 最新

  • duanjiaolao1187 2015-03-26 04:44
    关注

    tl;dr: A loop that does nothing but repeat a non-blocking check until it succeeds can cause hard-to-diagnose trouble (at a minimum, it can overuse CPU); using a blocking check can fix it.

    I'm not all that sure about the details of your case; I wrote a loop like yours that consistently hangs with "process took too long" on the Playground, but when I run it locally it does complete.

    As I commented, I'd aim for a simpler design, too.


    Go only has limited pre-emption of running goroutines: the running thread only yields control to the goroutine scheduler when a blocking operation (an like I/O or channel op or waiting to take a lock) happens.

    So with GOMAXPROCS=1, if the (one) running thread starts looping, nothing else will necessarily get a chance to run.

    A for { select { ...default: } } can therefore start a loop checking for items in a channel but never give up control of the main thread so that another goroutine can write an item. Other code gets to run anyway when when GOMAXPROCS is over 1, but not when it's 1 as it is on App Engine (or the Playground). The behavior depends not only on GOMAXPROCS, but on which goroutine happens to run first, which isn't necessarily defined.

    To avoid that situation, remove the default: so the select is a blocking operation that yields to the scheduler when it can't receive an item, allowing other code to run. You can generalize this to other cases where you might loop doing a nonblocking check; any of them could keep resources busy constantly rechecking when a blocking call would not. When GOMAXPROCS>1 or the runtime's limited preemption saves you, polling (as repeated checking is called) can still consume more CPU than blocking.

    For example, this fails with "process took too long" on the Playground, though annoyingly it completes reliably on my machine:

    package main
    
    import "fmt"
    
    func main() {
        c := make(chan struct{})
        go func() { c <- struct{}{} }()
        for {
            select {
            case <-c:
                fmt.Println("success")
                return
            default:
            }
        }
    } 
    

    I can't tell if there are other problems, but the hang for a pattern similar to the sample is noteworthy.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥30 这是哪个作者做的宝宝起名网站
  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!