The key pattern of that pipelines post is that you can view the contents of a channel as a stream of data, and write a set of cooperating goroutines that build up a data-processing stream graph. This can be a way to get some concurrency into a data-oriented application.
In terms of design, you may also find it more helpful to build up blocks that aren't tied to the goroutine structure, and wrap them in channels. This makes it much easier to test the lower-level code, and if you change your mind about running things in a goroutine or not, it's easier to add or remove the wrapper.
So in your example I'd start by refactoring the lowest-level tasks out into their own (synchronous) functions:
func fetch(ms int) int {
time.Sleep(time.Duration(ms) * time.Millisecond)
return ms
}
func report(ms int) string {
return fmt.Sprintf("Hello after %d ms", ms)
}
Since the second half of your example is fairly synchronous, it's easy to adapt to the pipeline pattern. We write a function that consumes all of its input stream and produces a complete output stream, closing it when it's done.
func reportAll(mss <-chan int, out chan<- string) {
for ms := range mss {
out <- report(ms)
}
close(out)
}
The function that calls the asynchronous code is a little tricker. In the main loop of the function, every time you read a value, you need to launch a goroutine to process it. Then after you've read everything out of the input channel you need to wait for all of those goroutines to finish before closing the output channel. You can use a small anonymous function here to help.
func fetchAll(mss <-chan int, out chan<- int) {
var wg sync.WaitGroup
for ms := range mss {
wg.Add(1)
go func(ms int) {
out <- fetch(ms)
wg.Done()
}(ms)
}
wg.Wait()
close(out)
}
It's also helpful here (because channel writes are blocking) to write another function to seed the input values.
func produceInputs(mss chan<- int) {
for ms := 1000; ms > 0; ms -= 300 {
mss <- ms
}
close(mss)
}
Now your main function needs to create the channels between these and run the final consumer.
// main is the entry point to the program.
//
// mss fetched results
// produceInputs --> fetchAll --> reportAll --> main
func main() {
mss := make(chan int)
fetched := make(chan int)
results := make(chan string)
go produceInputs(mss)
go fetchAll(mss, fetched)
go reportAll(fetched, results)
for val := range results {
fmt.Println(val)
}
}
https://play.golang.org/p/V9Z7ECUVIJL is a complete example.
I've avoided manually passing around sync.WaitGroup
s here (and tend to do that in general: you wouldn't have a WaitGroup unless you're explicitly calling something as the top level of a goroutine, so pushing the WaitGroup management up to the caller makes the code more modular; see my fetchAll
function above for an example). How do I know all of my goroutines have finished? We can trace through:
- If I've reached the end of
main
, the results
channel is closed.
- The
results
channel is the output channel of reportAll
; if it closed, then that function reached the end of its execution; and if that happened then the fetched
channel is closed.
- The
fetched
channel is the output channel of fetchAll
; ...
Another way to look at this is that as soon as the pipeline's source (produceInputs
) closes its output channel and finishes, that "I'm done" signal flows down the pipeline and causes the downstream steps to close their output channels and finish too.
The blog post mentions a separate explicit close channel. I haven't gone into that here at all. Since it was written, though, the standard library gained the context package, which is now the standard idiom for managing those. You'd need to use a select
statement in the body of the main loop, which makes the handling a little more complicated. This might look like:
func reportAllCtx(ctx context.Context, mss <-chan int, out chan<- string) {
for {
select {
case <-ctx.Done():
break
case ms, ok := <-mss:
if ok {
out <- report(ms)
} else {
break
}
}
}
}
close(out)
}