duanlian1960 2018-09-01 11:06
浏览 95
已采纳

如何在生产者缓慢,消费者快速进入的情况下处理频道关闭同步? [关闭]

I am new to go, and couldn't find an answer to this problem. What I am doing, is to read a CSV file in producer, doing some stuff which may take time, and then sending the output to a consumer via a channel. There's a chain of producer-consumers, and any producer may end up being slower than it's consumer.

producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)

There can be up to 15 consumers here.

Now the problem that I face is how to decide on the consumer side if the producer is done, and we can stop processing.

What I need to achieve is:

  1. once producer is done, all consumers should eventually do some cleanup and exit after finishing the remaining
  2. If a consumer doesn't get any data for a specific timeout period, it can exit(with a signal, preferably) without blocking any further.
  3. It happens for all the producer-consumer pair across the sequence.

I have used the following approach.

  1. To keep a signal channel along with each data channel, and to publish a "done", for each goroutine of its next consumer.
  2. After reading it, each consumer should just read the remaining buffered data in the channel and then put, say 5 "done" on next signal channel. Ensuring that it's only 5, and not 5 for each goroutine (using https://golang.org/pkg/sync/#Once.Do).
  3. Below is what I could think of till here.

    processRemaining = false
    for processRemaining == false{
            select {
            case stuff, ok := <-input_messages:
                    do_stuff(stuff)
                    if ok == false { // if channel has been closed
                        processRemaining = true
                    }
                    if result != nil {
                            //send to channel output_messages
                    }
            case sig := <-input_signals: // if signaled to stopped.
                    fmt.Println("received signal", sig)
                    processRemaining = true
            default:
                    fmt.Println("no activity")
            }
    }
    if processRemaining {
            for stuff := range input_messages {
                    do_stuff(stuff)
                    if result != nil {
                            //send to channel output_messages
                    }
            }
            // send "output_routine" number of "done" to a channel "output_signals".
    }
    

But even in this approach, I am unable to think of any way to behave the same way as closed "input_messages" channel, if nothing is available for, say 10 seconds.

Are there any problems I am ignoring with this approach. What are the possible way (or concurrency patterns) to approach this problem? ensuring:

  1. All the subsequent channels are closed, once first "chan0" is closed.
  2. All the producers are updated before closing their output channel, and the channel is closed only once they all have finished their writes.
  3. If a consumer gets no data from a channel for a specified timeout, it should treat it as closed, and unblocks itself.
  • 写回答

1条回答 默认 最新

  • doubeng3412 2018-09-01 11:59
    关注

    Use a sync.WaitGroup to keep track of the number of running goroutines. Each goroutine exits after it no longer gets data from the channel. Once the WaitGroup is done, the cleanup can be done.

    Something like this:

    import (
            "sync"
            "time"
    )
    
    type Data interface{} // just an example
    
    type Consumer interface {
            Consume(Data) Data
            CleanUp()
            Count() int
            Timeout() time.Duration
    }
    
    func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
            wg := sync.WaitGroup{}
            for i := 0; i < consumer.Count(); i++ {
                    wg.Add(1)
                    go func() {
                    consumeLoop:
                            for {
                                    select {
                                    case v, ok := <-inCh: // 'ok' says if the channel is still open
                                            if !ok {
                                                    break consumeLoop
                                            }
                                            outCh <- consumer.Consume(v)
                                    case <-time.After(consumer.Timeout()):
                                            break consumeLoop
                                    }
                            }
    
                            wg.Done()
                    }()
            }
            wg.Wait()
    
            consumer.CleanUp()
            close(outCh)
    }
    

    At each stage of the pipeline, you can use a similar process as the above to start the consumers.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 用C语言输入方程怎么
  • ¥15 网站显示不安全连接问题
  • ¥15 github训练的模型参数无法下载
  • ¥15 51单片机显示器问题
  • ¥20 关于#qt#的问题:Qt代码的移植问题
  • ¥50 求图像处理的matlab方案
  • ¥50 winform中使用edge的Kiosk模式
  • ¥15 关于#python#的问题:功能监听网页
  • ¥15 怎么让wx群机器人发送音乐
  • ¥15 fesafe材料库问题