I am new to go, and couldn't find an answer to this problem. What I am doing, is to read a CSV file in producer, doing some stuff which may take time, and then sending the output to a consumer via a channel. There's a chain of producer-consumers, and any producer may end up being slower than it's consumer.
producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)
There can be up to 15 consumers here.
Now the problem that I face is how to decide on the consumer side if the producer is done, and we can stop processing.
What I need to achieve is:
- once producer is done, all consumers should eventually do some cleanup and exit after finishing the remaining
- If a consumer doesn't get any data for a specific timeout period, it can exit(with a signal, preferably) without blocking any further.
- It happens for all the producer-consumer pair across the sequence.
I have used the following approach.
- To keep a signal channel along with each data channel, and to publish a "done", for each goroutine of its next consumer.
- After reading it, each consumer should just read the remaining buffered data in the channel and then put, say 5 "done" on next signal channel. Ensuring that it's only 5, and not 5 for each goroutine (using https://golang.org/pkg/sync/#Once.Do).
-
Below is what I could think of till here.
processRemaining = false for processRemaining == false{ select { case stuff, ok := <-input_messages: do_stuff(stuff) if ok == false { // if channel has been closed processRemaining = true } if result != nil { //send to channel output_messages } case sig := <-input_signals: // if signaled to stopped. fmt.Println("received signal", sig) processRemaining = true default: fmt.Println("no activity") } } if processRemaining { for stuff := range input_messages { do_stuff(stuff) if result != nil { //send to channel output_messages } } // send "output_routine" number of "done" to a channel "output_signals". }
But even in this approach, I am unable to think of any way to behave the same way as closed "input_messages" channel, if nothing is available for, say 10 seconds.
Are there any problems I am ignoring with this approach. What are the possible way (or concurrency patterns) to approach this problem? ensuring:
- All the subsequent channels are closed, once first "chan0" is closed.
- All the producers are updated before closing their output channel, and the channel is closed only once they all have finished their writes.
- If a consumer gets no data from a channel for a specified timeout, it should treat it as closed, and unblocks itself.