douji1999 2015-03-26 01:14
浏览 14
已采纳

Go例行泄漏在哪里?

I'm trying to run several tasks concurrently and return immediately if there is any error without to wait for all of the routines to return. The code looks as below. I've stripped out the noise to make it easier to digest but I can post the full code if the leak is not obvious. It's worth to note that I'm deploying this on google app engine. I can't reproduce the leak on my machine but when I replace the concurrency after // Consume the results comment the app is working fine, though I don't understand why because the code looks correct to me.

package main

import "fmt"
import "sync"
import "errors"

func main() {
    indexes := []int{1, 2, 3, 4, 5, 6, 7}
    devCh := make(chan int, 7)
    stopCh := make(chan struct{})
    errCh := make(chan error, 7)
    var wg sync.WaitGroup
    go func() {
        for _, sub := range indexes {
            wg.Add(1)
            go func(sub int) {
                defer wg.Done()
                // some code which creates other
                // wait groups and spans other go routines
                // handle errors
                if sub == 99 { // unreachable 
                    errCh <- errors.New("new error")

                }
            }(sub)
            select {
            // If there is any error we better stop the
            // loop
            case <-stopCh:
                return
            default:
            }
            devCh <- sub
        }
        wg.Wait()
        close(devCh)
    }()
    // Consume the results
    var results []int
    var wt sync.WaitGroup
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devCh {
            results = append(results, s)
        }
        return
    }()
    done := make(chan struct{})
    go func() {
        wt.Wait()
        close(done)
    }()

L:
    for {
        select {
        case err := <-errCh:
            fmt.Printf("error was %v", err)
            close(stopCh)
            return
        case <-done:
            break L
        default:
        }
    }
    fmt.Printf("all done, %v", results)
}

Edit: added some working code.

Edit: added code closer to the real code which may explain the need of the for loop.

package main

import "fmt"
import "sync"
import "errors"

func main() {
    indexes := []int{1, 2, 3, 4, 5, 6, 7}
    indexesString := []string{"a", "b", "c", "d"}
    devChS := make(chan string, 1000)

    devCh := make(chan int, 7)
    stopCh := make(chan struct{})
    errCh := make(chan error, 7)
    var wg sync.WaitGroup
    go func() {
        for _, sub := range indexes {
            wg.Add(1)
            go func(sub int) {
                defer wg.Done()
                // some code which creates other
                // wait groups and spans other go routines
                // handle errors
                if sub == 99 { // unreachable
                    errCh <- errors.New("new error")

                }
                wg.Add(1)
                go func(sub int) {
                    defer wg.Done()
                    for _, s := range indexesString {
                        devChS <- fmt.Sprintf("%s %s", s, sub)

                    }

                    return
                }(sub)
            }(sub)
            select {
            // If there is any error we better stop the
            // loop
            case <-stopCh:
                return
            default:
            }
            devCh <- sub
        }
        wg.Wait()
        close(devCh)
        close(devChS)
    }()
    // Consume the results
    var results = struct {
        integers []int
        strings  []string
    }{}
    var wt sync.WaitGroup
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devCh {
            results.integers = append(results.integers, s)
        }
        return
    }()
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devChS {
            results.strings = append(results.strings, s)
        }
        return
    }()
    done := make(chan struct{})
    go func() {
        wt.Wait()
        close(done)
    }()

L:
    for {
        select {
        case err := <-errCh:
            fmt.Printf("error was %v", err)
            close(stopCh)
            return
        case <-done:
            break L
        default:
        }
    }
    fmt.Printf("all done, can return the results: %v", results)
}
  • 写回答

1条回答 默认 最新

  • duanjiaolao1187 2015-03-26 04:44
    关注

    tl;dr: A loop that does nothing but repeat a non-blocking check until it succeeds can cause hard-to-diagnose trouble (at a minimum, it can overuse CPU); using a blocking check can fix it.

    I'm not all that sure about the details of your case; I wrote a loop like yours that consistently hangs with "process took too long" on the Playground, but when I run it locally it does complete.

    As I commented, I'd aim for a simpler design, too.


    Go only has limited pre-emption of running goroutines: the running thread only yields control to the goroutine scheduler when a blocking operation (an like I/O or channel op or waiting to take a lock) happens.

    So with GOMAXPROCS=1, if the (one) running thread starts looping, nothing else will necessarily get a chance to run.

    A for { select { ...default: } } can therefore start a loop checking for items in a channel but never give up control of the main thread so that another goroutine can write an item. Other code gets to run anyway when when GOMAXPROCS is over 1, but not when it's 1 as it is on App Engine (or the Playground). The behavior depends not only on GOMAXPROCS, but on which goroutine happens to run first, which isn't necessarily defined.

    To avoid that situation, remove the default: so the select is a blocking operation that yields to the scheduler when it can't receive an item, allowing other code to run. You can generalize this to other cases where you might loop doing a nonblocking check; any of them could keep resources busy constantly rechecking when a blocking call would not. When GOMAXPROCS>1 or the runtime's limited preemption saves you, polling (as repeated checking is called) can still consume more CPU than blocking.

    For example, this fails with "process took too long" on the Playground, though annoyingly it completes reliably on my machine:

    package main
    
    import "fmt"
    
    func main() {
        c := make(chan struct{})
        go func() { c <- struct{}{} }()
        for {
            select {
            case <-c:
                fmt.Println("success")
                return
            default:
            }
        }
    } 
    

    I can't tell if there are other problems, but the hang for a pattern similar to the sample is noteworthy.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 Arduino无法同时连接多个hx711模块,如何解决?
  • ¥50 需求一个up主付费课程
  • ¥20 模型在y分布之外的数据上预测能力不好如何解决
  • ¥15 processing提取音乐节奏
  • ¥15 gg加速器加速游戏时,提示不是x86架构
  • ¥15 python按要求编写程序
  • ¥15 Python输入字符串转化为列表排序具体见图,严格按照输入
  • ¥20 XP系统在重新启动后进不去桌面,一直黑屏。
  • ¥15 opencv图像处理,需要四个处理结果图
  • ¥15 无线移动边缘计算系统中的系统模型