If your intent is to propagate a signal along the pipeline to communicate when previous pipeline stages have completed and will produce no further values, you can do this synchronously by closing the channel after each pipeline stage returns. The following code does so by wrapping the invocation of each pipeline worker:
func startWork(val job, in, out chan interface{}) {
val(in, out)
// out will be closed after val returns
close(out)
}
// Later, in ExecutePipeline, start your worker by calling startWork
func ExecutePipeline(jobs ...job) {
// ...
for _, val := range jobs {
// ...
go startWork(val, in, out)
}
}
Avoiding multiple channel closure
I don't have any guarantee that out
will be closed at the end of each function
Conversely, if any worker can close a channel, this is problematic; the subsequent call in startWork
to close the channel will panic if you attempt to close an already-closed channel.
In this simple implementation, workers must delegating channel closure to the code which supervises the pipeline to avoid causing your program to panic.
Handling in-band signalling
As the signalling is passed in-band (in the same channel as the data), care may be required in the implementation of your pipeline workers to ensure they differentiate between
- reads of a value from an open channel, and
- reads of a zero value from a closed channel
range
ing over a channel in a for
loop will automatically break the loop when the channel is closed. If you implement your own logic to read from the channel, you will need to ascertain when the read trivially succeeds with a zero value because the channel is closed. This can be achieved using the multi-valued assignment form of the receive operator, which will return a boolean when a read from the channel was of a zero value because the channel was closed and empty.
func someWorker(in, out chan interface{}) {
for {
val, open := <-in
if !open {
// Read of val was the zero value of "in", indicating the channel
// is closed.
break // or, if appropriate, return
}
}
}