2018-09-01 11:06
浏览 94

如何在生产者缓慢,消费者快速进入的情况下处理频道关闭同步? [关闭]

I am new to go, and couldn't find an answer to this problem. What I am doing, is to read a CSV file in producer, doing some stuff which may take time, and then sending the output to a consumer via a channel. There's a chain of producer-consumers, and any producer may end up being slower than it's consumer.

producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)

There can be up to 15 consumers here.

Now the problem that I face is how to decide on the consumer side if the producer is done, and we can stop processing.

What I need to achieve is:

  1. once producer is done, all consumers should eventually do some cleanup and exit after finishing the remaining
  2. If a consumer doesn't get any data for a specific timeout period, it can exit(with a signal, preferably) without blocking any further.
  3. It happens for all the producer-consumer pair across the sequence.

I have used the following approach.

  1. To keep a signal channel along with each data channel, and to publish a "done", for each goroutine of its next consumer.
  2. After reading it, each consumer should just read the remaining buffered data in the channel and then put, say 5 "done" on next signal channel. Ensuring that it's only 5, and not 5 for each goroutine (using https://golang.org/pkg/sync/#Once.Do).
  3. Below is what I could think of till here.

    processRemaining = false
    for processRemaining == false{
            select {
            case stuff, ok := <-input_messages:
                    if ok == false { // if channel has been closed
                        processRemaining = true
                    if result != nil {
                            //send to channel output_messages
            case sig := <-input_signals: // if signaled to stopped.
                    fmt.Println("received signal", sig)
                    processRemaining = true
                    fmt.Println("no activity")
    if processRemaining {
            for stuff := range input_messages {
                    if result != nil {
                            //send to channel output_messages
            // send "output_routine" number of "done" to a channel "output_signals".

But even in this approach, I am unable to think of any way to behave the same way as closed "input_messages" channel, if nothing is available for, say 10 seconds.

Are there any problems I am ignoring with this approach. What are the possible way (or concurrency patterns) to approach this problem? ensuring:

  1. All the subsequent channels are closed, once first "chan0" is closed.
  2. All the producers are updated before closing their output channel, and the channel is closed only once they all have finished their writes.
  3. If a consumer gets no data from a channel for a specified timeout, it should treat it as closed, and unblocks itself.
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 邀请回答

1条回答 默认 最新

  • doubeng3412 2018-09-01 11:59

    Use a sync.WaitGroup to keep track of the number of running goroutines. Each goroutine exits after it no longer gets data from the channel. Once the WaitGroup is done, the cleanup can be done.

    Something like this:

    import (
    type Data interface{} // just an example
    type Consumer interface {
            Consume(Data) Data
            Count() int
            Timeout() time.Duration
    func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
            wg := sync.WaitGroup{}
            for i := 0; i < consumer.Count(); i++ {
                    go func() {
                            for {
                                    select {
                                    case v, ok := <-inCh: // 'ok' says if the channel is still open
                                            if !ok {
                                                    break consumeLoop
                                            outCh <- consumer.Consume(v)
                                    case <-time.After(consumer.Timeout()):
                                            break consumeLoop

    At each stage of the pipeline, you can use a similar process as the above to start the consumers.

    点赞 评论

相关推荐 更多相似问题