douji1999
douji1999
2015-03-26 01:14

Go例行泄漏在哪里?

已采纳

I'm trying to run several tasks concurrently and return immediately if there is any error without to wait for all of the routines to return. The code looks as below. I've stripped out the noise to make it easier to digest but I can post the full code if the leak is not obvious. It's worth to note that I'm deploying this on google app engine. I can't reproduce the leak on my machine but when I replace the concurrency after // Consume the results comment the app is working fine, though I don't understand why because the code looks correct to me.

package main

import "fmt"
import "sync"
import "errors"

func main() {
    indexes := []int{1, 2, 3, 4, 5, 6, 7}
    devCh := make(chan int, 7)
    stopCh := make(chan struct{})
    errCh := make(chan error, 7)
    var wg sync.WaitGroup
    go func() {
        for _, sub := range indexes {
            wg.Add(1)
            go func(sub int) {
                defer wg.Done()
                // some code which creates other
                // wait groups and spans other go routines
                // handle errors
                if sub == 99 { // unreachable 
                    errCh <- errors.New("new error")

                }
            }(sub)
            select {
            // If there is any error we better stop the
            // loop
            case <-stopCh:
                return
            default:
            }
            devCh <- sub
        }
        wg.Wait()
        close(devCh)
    }()
    // Consume the results
    var results []int
    var wt sync.WaitGroup
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devCh {
            results = append(results, s)
        }
        return
    }()
    done := make(chan struct{})
    go func() {
        wt.Wait()
        close(done)
    }()

L:
    for {
        select {
        case err := <-errCh:
            fmt.Printf("error was %v", err)
            close(stopCh)
            return
        case <-done:
            break L
        default:
        }
    }
    fmt.Printf("all done, %v", results)
}

Edit: added some working code.

Edit: added code closer to the real code which may explain the need of the for loop.

package main

import "fmt"
import "sync"
import "errors"

func main() {
    indexes := []int{1, 2, 3, 4, 5, 6, 7}
    indexesString := []string{"a", "b", "c", "d"}
    devChS := make(chan string, 1000)

    devCh := make(chan int, 7)
    stopCh := make(chan struct{})
    errCh := make(chan error, 7)
    var wg sync.WaitGroup
    go func() {
        for _, sub := range indexes {
            wg.Add(1)
            go func(sub int) {
                defer wg.Done()
                // some code which creates other
                // wait groups and spans other go routines
                // handle errors
                if sub == 99 { // unreachable
                    errCh <- errors.New("new error")

                }
                wg.Add(1)
                go func(sub int) {
                    defer wg.Done()
                    for _, s := range indexesString {
                        devChS <- fmt.Sprintf("%s %s", s, sub)

                    }

                    return
                }(sub)
            }(sub)
            select {
            // If there is any error we better stop the
            // loop
            case <-stopCh:
                return
            default:
            }
            devCh <- sub
        }
        wg.Wait()
        close(devCh)
        close(devChS)
    }()
    // Consume the results
    var results = struct {
        integers []int
        strings  []string
    }{}
    var wt sync.WaitGroup
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devCh {
            results.integers = append(results.integers, s)
        }
        return
    }()
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devChS {
            results.strings = append(results.strings, s)
        }
        return
    }()
    done := make(chan struct{})
    go func() {
        wt.Wait()
        close(done)
    }()

L:
    for {
        select {
        case err := <-errCh:
            fmt.Printf("error was %v", err)
            close(stopCh)
            return
        case <-done:
            break L
        default:
        }
    }
    fmt.Printf("all done, can return the results: %v", results)
}
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

1条回答

  • duanjiaolao1187 duanjiaolao1187 6年前

    tl;dr: A loop that does nothing but repeat a non-blocking check until it succeeds can cause hard-to-diagnose trouble (at a minimum, it can overuse CPU); using a blocking check can fix it.

    I'm not all that sure about the details of your case; I wrote a loop like yours that consistently hangs with "process took too long" on the Playground, but when I run it locally it does complete.

    As I commented, I'd aim for a simpler design, too.


    Go only has limited pre-emption of running goroutines: the running thread only yields control to the goroutine scheduler when a blocking operation (an like I/O or channel op or waiting to take a lock) happens.

    So with GOMAXPROCS=1, if the (one) running thread starts looping, nothing else will necessarily get a chance to run.

    A for { select { ...default: } } can therefore start a loop checking for items in a channel but never give up control of the main thread so that another goroutine can write an item. Other code gets to run anyway when when GOMAXPROCS is over 1, but not when it's 1 as it is on App Engine (or the Playground). The behavior depends not only on GOMAXPROCS, but on which goroutine happens to run first, which isn't necessarily defined.

    To avoid that situation, remove the default: so the select is a blocking operation that yields to the scheduler when it can't receive an item, allowing other code to run. You can generalize this to other cases where you might loop doing a nonblocking check; any of them could keep resources busy constantly rechecking when a blocking call would not. When GOMAXPROCS>1 or the runtime's limited preemption saves you, polling (as repeated checking is called) can still consume more CPU than blocking.

    For example, this fails with "process took too long" on the Playground, though annoyingly it completes reliably on my machine:

    package main
    
    import "fmt"
    
    func main() {
        c := make(chan struct{})
        go func() { c <- struct{}{} }()
        for {
            select {
            case <-c:
                fmt.Println("success")
                return
            default:
            }
        }
    } 
    

    I can't tell if there are other problems, but the hang for a pattern similar to the sample is noteworthy.

    点赞 评论 复制链接分享

相关推荐