I have made a simple code example to understand the usage of pipeline, here it is.
package main
import (
"fmt"
"sync"
"time"
)
func main() {
ch1 := make(chan int, 10) // Use buffered channel so as to avoid clogging
ch2 := make(chan string, 10)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func1(i, ch1, &wg)
go func2(ch1, ch2)
}
wg.Wait()
close(ch1)
for val := range ch2 {
fmt.Println(val)
}
}
func func1(seconds int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Duration(seconds) * time.Second)
fmt.Println(seconds)
ch <- seconds
}
func func2(ch1 chan int, ch2 chan string) {
for range ch1 {
ch2 <- "hello"
}
close(ch2)
}
Now, the problem is I don't get consistent output ( I understand it's some concurrency issue, which I haven't fully understood ).
Output
> go run pipeline-loop.go
0
1
2
hello
hello
> go run pipeline-loop.go
0
1
2
hello
hello
hello
> go run pipeline-loop.go
0
1
2
hello
hello
> go run pipeline-loop.go
0
1
2
hello
hello
> go run pipeline-loop.go
0
1
2
hello
hello
panic: close of closed channel
goroutine 6 [running]:
main.func2(0xc00006c000, 0xc000056180)
/home/projects/go-tuts/pipeline-loop.go:36 +0x72
created by main.main
/home/projects/go-tuts/pipeline-loop.go:16 +0x10f
exit status 2
Another guy changed the code ( and it was working ) and put func2
outside the loop but I want func2
for each iteration of the func1
.
Problem
So, I want to understand as to where should the WaitGroup
and close(ch)
be used ?
Thanks.
Temporarya
( A golang noobie )
Update
Based on a user's answer, I changed the code, now I get output as expected ( but not the solution to this question ) but still there's a deadlock. https://play.golang.org/p/O_rp_FLvNh8
package main
import (
"fmt"
"sync"
"time"
)
func main() {
ch1 := make(chan int, 10) // Use buffered channel so as to avoid clogging
ch2 := make(chan string, 10)
var wg1 sync.WaitGroup
// var wg2 sync.WaitGroup
for i := 0; i < 3; i++ {
wg1.Add(1)
go func1(i, ch1)
go func2(ch1, ch2, &wg1)
}
for val := range ch2 {
fmt.Println(val)
}
wg1.Wait()
close(ch1)
close(ch2)
}
// func func1(seconds int, ch chan<- int, wg *sync.WaitGroup) {
func func1(seconds int, ch chan<- int) {
// defer wg.Done()
time.Sleep(time.Duration(seconds) * time.Second)
fmt.Println(seconds)
ch <- seconds
}
func func2(ch1 chan int, ch2 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for range ch1 {
ch2 <- "hello"
}
}