dtsc14683 2015-09-03 18:34
浏览 38
已采纳

缓冲区为空后,关闭“工作者”执行例程

I want my go routine worker (ProcessToDo() in the code below) to wait until all "queued" work is processed before shutting down.

The worker routine has a "to do" channel (buffered), through which work is sent to it. And it has a "done" channel to tell it to start shutdown. The documentation says that the select on the channels will pick a "pseudo-random value" if more than one of the selects are met... which means the shutdown (return) is being triggered before all the buffered work is completed.

In the code sample below, I want all 20 messages to print...

package main

import (
    "time"
    "fmt"
)


func ProcessToDo(done chan struct{}, todo chan string) {
    for {
        select {
        case work, ok := <-todo:
            if !ok {
                fmt.Printf("Shutting down ProcessToDo - todo channel closed!
")
                return
            }
            fmt.Printf("todo: %q
", work)
            time.Sleep(100 * time.Millisecond)
        case _, ok := <-done:
            if ok {
                fmt.Printf("Shutting down ProcessToDo - done message received!
")
            } else {
                fmt.Printf("Shutting down ProcessToDo - done channel closed!
")
            }
            close(todo)
            return
        }
    }
}

func main() {

    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    time.Sleep(1 * time.Second)
    close(done)
    time.Sleep(4 * time.Second)
}
  • 写回答

2条回答 默认 最新

  • douba1498 2015-09-03 18:55
    关注

    done channel in your case is completely unnecessary as you can signal the shutdown by closing the todo channel itself.

    And use the for range on the channel which will iterate until the channel is closed and its buffer is empty.

    You should have a done channel, but only so that the goroutine itself can signal that it finished work and so the main goroutine can continue or exit.

    This variant is equivalent to yours, is much simpler and does not require time.Sleep() calls to wait other goroutines (which would be too erroneous and undeterministic anyway). Try it on the Go Playground:

    func ProcessToDo(done chan struct{}, todo chan string) {
        for work := range todo {
            fmt.Printf("todo: %q
    ", work)
            time.Sleep(100 * time.Millisecond)
        }
        fmt.Printf("Shutting down ProcessToDo - todo channel closed!
    ")
        done <- struct{}{} // Signal that we processed all jobs
    }
    
    func main() {
        done := make(chan struct{})
        todo := make(chan string, 100)
    
        go ProcessToDo(done, todo)
    
        for i := 0; i < 20; i++ {
            todo <- fmt.Sprintf("Message %02d", i)
        }
    
        fmt.Println("*** all messages queued ***")
        close(todo)
        <-done // Wait until the other goroutine finishes all jobs
    }
    

    Also note that worker goroutines should signal completion using defer so the main goroutine won't get stuck waiting for the worker if it returns in some unexpected way, or panics. So it should rather start like this:

    defer func() {
        done <- struct{}{} // Signal that we processed all jobs
    }()
    

    You can also use sync.WaitGroup to sync the main goroutine to the worker (to wait it up). In fact if you plan to use multiple worker goroutines, that is cleaner than to read multiple values from the done channel. Also it's simpler to signal the completion with WaitGroup as it has a Done() method (which is a function call) so you don't need an anonymous function:

    defer wg.Done()
    

    See JimB's anwser for the complete example with WaitGroup.

    Using the for range is also idiomatic if you want to use multiple worker goroutines: channels are synchronized so you don't need any extra code that would synchronize access to the todo channel or the jobs received from it. And if you close the todo channel in the main(), that will properly signal all worker goroutines. But of course all queued jobs will be received and processed exactly once.

    Now taking the variant that uses WaitGroup to make the main goroutine to wait for the worker (JimB's answer): What if you want more than 1 worker goroutine; to process your jobs concurrently (and most likely parallel)?

    The only thing you need to add / change in your code is this: to really start multiple of them:

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go ProcessToDo(todo)
    }
    

    Without changing anything else, you now have a correct, concurrent application which receives and processes your jobs using 10 concurrent goroutines. And we haven't used any "ugly" time.Sleep() (we used one but only to simulate slow processing, not to wait other goroutines), and you don't need any extra synchronization.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 制裁名单20240508芯片厂商
  • ¥20 易康econgnition精度验证
  • ¥15 msix packaging tool打包问题
  • ¥28 微信小程序开发页面布局没问题,真机调试的时候页面布局就乱了
  • ¥15 python的qt5界面
  • ¥15 无线电能传输系统MATLAB仿真问题
  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致