douri4459 2018-10-14 12:39
浏览 35
已采纳

实施工作人员职能流水线

I'm implementing a pipeline of a several worker functions connected by channels. All of them get (in, out chan interface{}) as a input (each function receives out of previous one as in)

I don't have any guarantee that out will be closed at the end of each function, so I'm wondering how should I check if a previous function has done its work. I've started with something like this:

func ExecutePipeline(jobs ...job) {
    out := make(chan interface{}, 10)
    for _, val := range jobs {
        in := out
        out := make(chan interface{})
        go val(in, out)
    }
}

I'm thinking about using WaitGroup somehow to use the end of function's goroutine as an indicator that it's done its work and close its out channel.
How can I do it?

  • 写回答

1条回答 默认 最新

  • duanheye7423 2018-10-14 14:40
    关注

    If your intent is to propagate a signal along the pipeline to communicate when previous pipeline stages have completed and will produce no further values, you can do this synchronously by closing the channel after each pipeline stage returns. The following code does so by wrapping the invocation of each pipeline worker:

    func startWork(val job, in, out chan interface{}) {
        val(in, out)
        // out will be closed after val returns
        close(out)
    }
    
    // Later, in ExecutePipeline, start your worker by calling startWork
    func ExecutePipeline(jobs ...job) {
        // ...
        for _, val := range jobs {
            // ...
            go startWork(val, in, out)
        }
    }
    

    Avoiding multiple channel closure

    I don't have any guarantee that out will be closed at the end of each function

    Conversely, if any worker can close a channel, this is problematic; the subsequent call in startWork to close the channel will panic if you attempt to close an already-closed channel.

    In this simple implementation, workers must delegating channel closure to the code which supervises the pipeline to avoid causing your program to panic.


    Handling in-band signalling

    As the signalling is passed in-band (in the same channel as the data), care may be required in the implementation of your pipeline workers to ensure they differentiate between

    • reads of a value from an open channel, and
    • reads of a zero value from a closed channel

    rangeing over a channel in a for loop will automatically break the loop when the channel is closed. If you implement your own logic to read from the channel, you will need to ascertain when the read trivially succeeds with a zero value because the channel is closed. This can be achieved using the multi-valued assignment form of the receive operator, which will return a boolean when a read from the channel was of a zero value because the channel was closed and empty.

    func someWorker(in, out chan interface{}) {
        for {
            val, open := <-in
            if !open {
                // Read of val was the zero value of "in", indicating the channel
                // is closed.
                break // or, if appropriate, return
            }
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 c语言怎么用printf(“\b \b”)与getch()实现黑框里写入与删除?
  • ¥20 怎么用dlib库的算法识别小麦病虫害
  • ¥15 华为ensp模拟器中S5700交换机在配置过程中老是反复重启
  • ¥15 java写代码遇到问题,求帮助
  • ¥15 uniapp uview http 如何实现统一的请求异常信息提示?
  • ¥15 有了解d3和topogram.js库的吗?有偿请教
  • ¥100 任意维数的K均值聚类
  • ¥15 stamps做sbas-insar,时序沉降图怎么画
  • ¥15 买了个传感器,根据商家发的代码和步骤使用但是代码报错了不会改,有没有人可以看看
  • ¥15 关于#Java#的问题,如何解决?