duanlian1960 2018-09-01 11:06
浏览 95
已采纳

如何在生产者缓慢,消费者快速进入的情况下处理频道关闭同步? [关闭]

I am new to go, and couldn't find an answer to this problem. What I am doing, is to read a CSV file in producer, doing some stuff which may take time, and then sending the output to a consumer via a channel. There's a chain of producer-consumers, and any producer may end up being slower than it's consumer.

producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)

There can be up to 15 consumers here.

Now the problem that I face is how to decide on the consumer side if the producer is done, and we can stop processing.

What I need to achieve is:

  1. once producer is done, all consumers should eventually do some cleanup and exit after finishing the remaining
  2. If a consumer doesn't get any data for a specific timeout period, it can exit(with a signal, preferably) without blocking any further.
  3. It happens for all the producer-consumer pair across the sequence.

I have used the following approach.

  1. To keep a signal channel along with each data channel, and to publish a "done", for each goroutine of its next consumer.
  2. After reading it, each consumer should just read the remaining buffered data in the channel and then put, say 5 "done" on next signal channel. Ensuring that it's only 5, and not 5 for each goroutine (using https://golang.org/pkg/sync/#Once.Do).
  3. Below is what I could think of till here.

    processRemaining = false
    for processRemaining == false{
            select {
            case stuff, ok := <-input_messages:
                    do_stuff(stuff)
                    if ok == false { // if channel has been closed
                        processRemaining = true
                    }
                    if result != nil {
                            //send to channel output_messages
                    }
            case sig := <-input_signals: // if signaled to stopped.
                    fmt.Println("received signal", sig)
                    processRemaining = true
            default:
                    fmt.Println("no activity")
            }
    }
    if processRemaining {
            for stuff := range input_messages {
                    do_stuff(stuff)
                    if result != nil {
                            //send to channel output_messages
                    }
            }
            // send "output_routine" number of "done" to a channel "output_signals".
    }
    

But even in this approach, I am unable to think of any way to behave the same way as closed "input_messages" channel, if nothing is available for, say 10 seconds.

Are there any problems I am ignoring with this approach. What are the possible way (or concurrency patterns) to approach this problem? ensuring:

  1. All the subsequent channels are closed, once first "chan0" is closed.
  2. All the producers are updated before closing their output channel, and the channel is closed only once they all have finished their writes.
  3. If a consumer gets no data from a channel for a specified timeout, it should treat it as closed, and unblocks itself.
  • 写回答

1条回答 默认 最新

  • doubeng3412 2018-09-01 11:59
    关注

    Use a sync.WaitGroup to keep track of the number of running goroutines. Each goroutine exits after it no longer gets data from the channel. Once the WaitGroup is done, the cleanup can be done.

    Something like this:

    import (
            "sync"
            "time"
    )
    
    type Data interface{} // just an example
    
    type Consumer interface {
            Consume(Data) Data
            CleanUp()
            Count() int
            Timeout() time.Duration
    }
    
    func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
            wg := sync.WaitGroup{}
            for i := 0; i < consumer.Count(); i++ {
                    wg.Add(1)
                    go func() {
                    consumeLoop:
                            for {
                                    select {
                                    case v, ok := <-inCh: // 'ok' says if the channel is still open
                                            if !ok {
                                                    break consumeLoop
                                            }
                                            outCh <- consumer.Consume(v)
                                    case <-time.After(consumer.Timeout()):
                                            break consumeLoop
                                    }
                            }
    
                            wg.Done()
                    }()
            }
            wg.Wait()
    
            consumer.CleanUp()
            close(outCh)
    }
    

    At each stage of the pipeline, you can use a similar process as the above to start the consumers.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥20 机器学习能否像多层线性模型一样处理嵌套数据
  • ¥20 西门子S7-Graph,S7-300,梯形图
  • ¥50 用易语言http 访问不了网页
  • ¥50 safari浏览器fetch提交数据后数据丢失问题
  • ¥15 matlab不知道怎么改,求解答!!
  • ¥15 永磁直线电机的电流环pi调不出来
  • ¥15 用stata实现聚类的代码
  • ¥15 请问paddlehub能支持移动端开发吗?在Android studio上该如何部署?
  • ¥20 docker里部署springboot项目,访问不到扬声器
  • ¥15 netty整合springboot之后自动重连失效