dozoqn3347 2017-03-11 02:04
浏览 20
已采纳

半异步代码逻辑

I'm struggling to figure out a working design that would mix together synchronous flow with asynchronous behavior.

I've 4 components:

  1. Seeder
  2. Worker
  3. Publisher
  4. Updater

The only limitation I've is that once Seeder seeds data it must be blocked up until Updater is not fully finished with processing all tasks. The first 3 components could easily be synchronous but the Updater must work in parallel or it would take forever to finish the tasks.

So the flow is:

Seeder -> Worker -> Publisher -> Updater --> Seeder -> Worker -> Publisher -> Updater ...

and this flow must rotate forever.

The seeding and updating is towards a database. Unfortunately this particular database doesn't allow for a different design.

The best I got to is using sync.WaitGroup to sync the Updater goroutines and leave everything else in a synchronous state. The data to the Updater are provided through a channel.

Here is a simplified code (no errors, not much logic in).

func main() {
    var wg sync.WaitGroup
    c := make(chan Result, 100)

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(){
                defer wg.Done()
                data := <- c

                // this is the updater
            }(&wg)
        }

        for _, result := range results {
            c <- result
        }
        wg.Wait()
    }
}

The result is that the code works up until it halts at some cycle and never moves forward. I've played with many variables, loading 100 rows instead of 10k and the result is not much different.

I also tried to pass a struct containing channels and run everything asynchronously but I've even harder time figuring out when Updater is finished so I can unblock the seeder.

Any pointers are appreciated.

  • 写回答

1条回答 默认 最新

  • douzhan8652 2017-03-11 18:27
    关注

    It is hard to tell because your code cannot be compiled and run, and it is not clear how you use c. At least one thing is sure : wg should be passed by reference, not by value (sync.WaitGroup has the nocopy annotation). Then, I suppose you use c to send values to the updater. But you don’t provide their code, so I can only guess. For example, suppose that the scheduling happens such that the first 9 goroutines take all there is to read in the channel; then, the last routine is blocked forever and will never release the WaitGroup. In that case, a simple solution is to create a fresh channel in each iteration of your outermost for loop (move line 3 down two lines) and close c right before calling wg.Wait(). Your updaters must be able to handle a read from a close channel.

    [edit] I think what you are looking for is something like this:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    // Result is a type
    type Result struct {
        I int
    }
    
    // Seeder is a function
    func Seeder() []int {
        fmt.Println("Seeding")
        return []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
    }
    
    // Worker is a function
    func Worker(data []int) []int {
        return data
    }
    
    // Publisher is a function
    func Publisher(data []int) []Result {
        var r []Result
        for i := 0; i < len(data); i++ {
            r = append(r, Result{I: data[i]})
        }
        return r
    }
    
    func updater(c chan Result, wg *sync.WaitGroup) {
        for _ = range c {
            // update here
            wg.Done()
        }
    }
    
    func main() {
        var wg sync.WaitGroup
    
        c := make(chan Result, 100)
        for i := 0; i < 10; i++ {
            go updater(c, &wg)
        }
    
        for {
            data := Seeder()
            msgs := Worker(data)
            results := Publisher(msgs)
    
            wg.Add(len(results))
            for _, result := range results {
                c <- result
            }
            wg.Wait()
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥40 复杂的限制性的商函数处理
  • ¥15 程序不包含适用于入口点的静态Main方法
  • ¥15 素材场景中光线烘焙后灯光失效
  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码