I'm trying to run several tasks concurrently and return immediately if there is any error without to wait for all of the routines to return. The code looks as below. I've stripped out the noise to make it easier to digest but I can post the full code if the leak is not obvious.
It's worth to note that I'm deploying this on google app engine. I can't reproduce the leak on my machine but when I replace the concurrency after // Consume the results
comment the app is working fine, though I don't understand why because the code looks correct to me.
package main
import "fmt"
import "sync"
import "errors"
func main() {
indexes := []int{1, 2, 3, 4, 5, 6, 7}
devCh := make(chan int, 7)
stopCh := make(chan struct{})
errCh := make(chan error, 7)
var wg sync.WaitGroup
go func() {
for _, sub := range indexes {
wg.Add(1)
go func(sub int) {
defer wg.Done()
// some code which creates other
// wait groups and spans other go routines
// handle errors
if sub == 99 { // unreachable
errCh <- errors.New("new error")
}
}(sub)
select {
// If there is any error we better stop the
// loop
case <-stopCh:
return
default:
}
devCh <- sub
}
wg.Wait()
close(devCh)
}()
// Consume the results
var results []int
var wt sync.WaitGroup
wt.Add(1)
go func() {
defer wt.Done()
for s := range devCh {
results = append(results, s)
}
return
}()
done := make(chan struct{})
go func() {
wt.Wait()
close(done)
}()
L:
for {
select {
case err := <-errCh:
fmt.Printf("error was %v", err)
close(stopCh)
return
case <-done:
break L
default:
}
}
fmt.Printf("all done, %v", results)
}
Edit: added some working code.
Edit: added code closer to the real code which may explain the need of the for loop.
package main
import "fmt"
import "sync"
import "errors"
func main() {
indexes := []int{1, 2, 3, 4, 5, 6, 7}
indexesString := []string{"a", "b", "c", "d"}
devChS := make(chan string, 1000)
devCh := make(chan int, 7)
stopCh := make(chan struct{})
errCh := make(chan error, 7)
var wg sync.WaitGroup
go func() {
for _, sub := range indexes {
wg.Add(1)
go func(sub int) {
defer wg.Done()
// some code which creates other
// wait groups and spans other go routines
// handle errors
if sub == 99 { // unreachable
errCh <- errors.New("new error")
}
wg.Add(1)
go func(sub int) {
defer wg.Done()
for _, s := range indexesString {
devChS <- fmt.Sprintf("%s %s", s, sub)
}
return
}(sub)
}(sub)
select {
// If there is any error we better stop the
// loop
case <-stopCh:
return
default:
}
devCh <- sub
}
wg.Wait()
close(devCh)
close(devChS)
}()
// Consume the results
var results = struct {
integers []int
strings []string
}{}
var wt sync.WaitGroup
wt.Add(1)
go func() {
defer wt.Done()
for s := range devCh {
results.integers = append(results.integers, s)
}
return
}()
wt.Add(1)
go func() {
defer wt.Done()
for s := range devChS {
results.strings = append(results.strings, s)
}
return
}()
done := make(chan struct{})
go func() {
wt.Wait()
close(done)
}()
L:
for {
select {
case err := <-errCh:
fmt.Printf("error was %v", err)
close(stopCh)
return
case <-done:
break L
default:
}
}
fmt.Printf("all done, can return the results: %v", results)
}