普通网友 2016-06-05 16:03
浏览 458
已采纳

Golang中的事件驱动模式

I am using golang to implement a simple event driven worker. It's like this:

  go func() {
        for {
            select {
            case data := <-ch:
                time.Sleep(1)
                someGlobalMap[data.key] = data.value 
            }
        }
    }()

And the main function will create several goroutines, and each of them will do thing like this:

ch <- data
fmt.Println(someGlobalMap[data.key])

As you can see that, because my worker need some time to do the work, I will got nil result in my main function.How can I control this workflow properly?

  • 写回答

1条回答 默认 最新

  • dqayok7935 2016-06-05 18:32
    关注

    EDIT: I may have misread your question, I see that you mention that main will start many producer goroutines. I thought it was many consumer goroutines, and a single producer. Leaving the answer here in case it can be useful for others looking for that pattern, though the bullet points still apply to your case.

    So if I understand correctly your use-case, you can't expect to send on a channel and read the results immediately after. You don't know when the worker will process that send, you need to communicate between the goroutines, and that is done with channels. Assuming just calling a function with a return value doesn't work in your scenario, if you really need to send to a worker, then block until you have the result, you could send a channel as part of the data structure, and block-receive on it after the send, i.e.:

    resCh := make(chan Result)
    ch <- Data{key, value, resCh}
    res := <- resCh
    

    But you should probably try to break down the work as a pipeline of independent steps instead, see the blog post that I linked to in the original answer.


    Original answer where I thought it was a single producer - multiple consumers/workers pattern:

    This is a common pattern for which Go's goroutines and channels semantics are very well suited. There are a few things you need to keep in mind:

    • The main function will not automatically wait for goroutines to finish. If there's nothing else to do in the main, then the program exits and you don't have your results.

    • The global map that you use is not thread-safe. You need to synchronize access via a mutex, but there's a better way - use an output channel for results, which is already synchronized.

    • You can use a for..range over a channel, and you can safely share a channel between multiple goroutines. As we'll see, that makes this pattern quite elegant to write.

    Playground: https://play.golang.org/p/WqyZfwldqp

    For more on Go pipelines and concurrency patterns, to introduce error handling, early cancellation, etc.: https://blog.golang.org/pipelines

    Commented code for the use-case you mention:

    // could be a command-line flag, a config, etc.
    const numGoros = 10
    
    // Data is a similar data structure to the one mentioned in the question.
    type Data struct {
        key   string
        value int
    }
    
    func main() {
        var wg sync.WaitGroup
    
        // create the input channel that sends work to the goroutines
        inch := make(chan Data)
        // create the output channel that sends results back to the main function
        outch := make(chan Data)
    
        // the WaitGroup keeps track of pending goroutines, you can add numGoros
        // right away if you know how many will be started, otherwise do .Add(1)
        // each time before starting a worker goroutine.
        wg.Add(numGoros)
        for i := 0; i < numGoros; i++ {
            // because it uses a closure, it could've used inch and outch automaticaly,
            // but if the func gets bigger you may want to extract it to a named function,
            // and I wanted to show the directed channel types: within that function, you
            // can only receive from inch, and only send (and close) to outch.
            //
            // It also receives the index i, just for fun so it can set the goroutines'
            // index as key in the results, to show that it was processed by different
            // goroutines. Also, big gotcha: do not capture a for-loop iteration variable
            // in a closure, pass it as argument, otherwise it very likely won't do what
            // you expect.
            go func(i int, inch <-chan Data, outch chan<- Data) {
                // make sure WaitGroup.Done is called on exit, so Wait unblocks
                // eventually.
                defer wg.Done()
    
                // range over a channel gets the next value to process, safe to share
                // concurrently between all goroutines. It exits the for loop once
                // the channel is closed and drained, so wg.Done will be called once
                // ch is closed.
                for data := range inch {
                    // process the data...
                    time.Sleep(10 * time.Millisecond)
                    outch <- Data{strconv.Itoa(i), data.value}
                }
            }(i, inch, outch)
        }
    
        // start the goroutine that prints the results, use a separate WaitGroup to track
        // it (could also have used a "done" channel but the for-loop would be more complex, with a select).
        var wgResults sync.WaitGroup
        wgResults.Add(1)
        go func(ch <-chan Data) {
            defer wgResults.Done()
    
            // to prove it processed everything, keep a counter and print it on exit
            var n int
            for data := range ch {
                fmt.Println(data.key, data.value)
                n++
            }
    
            // for fun, try commenting out the wgResults.Wait() call at the end, the output
            // will likely miss this line.
            fmt.Println(">>> Processed: ", n)
        }(outch)
    
        // send work, wherever that comes from...
        for i := 0; i < 1000; i++ {
            inch <- Data{"main", i}
        }
    
        // when there's no more work to send, close the inch, so the goroutines will begin
        // draining it and exit once all values have been processed.
        close(inch)
    
        // wait for all goroutines to exit
        wg.Wait()
    
        // at this point, no more results will be written to outch, close it to signal
        // to the results goroutine that it can terminate.
        close(outch)
    
        // and wait for the results goroutine to actually exit, otherwise the program would
        // possibly terminate without printing the last few values.
        wgResults.Wait()
    }
    

    In real-life scenarios, where the amount of work is not known ahead of time, the closing of the in-channel could come from e.g. a SIGINT signal. Just make sure no code path can send work after the channel was closed as that would panic.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 对于相关问题的求解与代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料