dozoqn3347 2017-03-11 02:04
浏览 20
已采纳

半异步代码逻辑

I'm struggling to figure out a working design that would mix together synchronous flow with asynchronous behavior.

I've 4 components:

  1. Seeder
  2. Worker
  3. Publisher
  4. Updater

The only limitation I've is that once Seeder seeds data it must be blocked up until Updater is not fully finished with processing all tasks. The first 3 components could easily be synchronous but the Updater must work in parallel or it would take forever to finish the tasks.

So the flow is:

Seeder -> Worker -> Publisher -> Updater --> Seeder -> Worker -> Publisher -> Updater ...

and this flow must rotate forever.

The seeding and updating is towards a database. Unfortunately this particular database doesn't allow for a different design.

The best I got to is using sync.WaitGroup to sync the Updater goroutines and leave everything else in a synchronous state. The data to the Updater are provided through a channel.

Here is a simplified code (no errors, not much logic in).

func main() {
    var wg sync.WaitGroup
    c := make(chan Result, 100)

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(){
                defer wg.Done()
                data := <- c

                // this is the updater
            }(&wg)
        }

        for _, result := range results {
            c <- result
        }
        wg.Wait()
    }
}

The result is that the code works up until it halts at some cycle and never moves forward. I've played with many variables, loading 100 rows instead of 10k and the result is not much different.

I also tried to pass a struct containing channels and run everything asynchronously but I've even harder time figuring out when Updater is finished so I can unblock the seeder.

Any pointers are appreciated.

  • 写回答

1条回答 默认 最新

  • douzhan8652 2017-03-11 18:27
    关注

    It is hard to tell because your code cannot be compiled and run, and it is not clear how you use c. At least one thing is sure : wg should be passed by reference, not by value (sync.WaitGroup has the nocopy annotation). Then, I suppose you use c to send values to the updater. But you don’t provide their code, so I can only guess. For example, suppose that the scheduling happens such that the first 9 goroutines take all there is to read in the channel; then, the last routine is blocked forever and will never release the WaitGroup. In that case, a simple solution is to create a fresh channel in each iteration of your outermost for loop (move line 3 down two lines) and close c right before calling wg.Wait(). Your updaters must be able to handle a read from a close channel.

    [edit] I think what you are looking for is something like this:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    // Result is a type
    type Result struct {
        I int
    }
    
    // Seeder is a function
    func Seeder() []int {
        fmt.Println("Seeding")
        return []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
    }
    
    // Worker is a function
    func Worker(data []int) []int {
        return data
    }
    
    // Publisher is a function
    func Publisher(data []int) []Result {
        var r []Result
        for i := 0; i < len(data); i++ {
            r = append(r, Result{I: data[i]})
        }
        return r
    }
    
    func updater(c chan Result, wg *sync.WaitGroup) {
        for _ = range c {
            // update here
            wg.Done()
        }
    }
    
    func main() {
        var wg sync.WaitGroup
    
        c := make(chan Result, 100)
        for i := 0; i < 10; i++ {
            go updater(c, &wg)
        }
    
        for {
            data := Seeder()
            msgs := Worker(data)
            results := Publisher(msgs)
    
            wg.Add(len(results))
            for _, result := range results {
                c <- result
            }
            wg.Wait()
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀
  • ¥20 手写数字识别运行c仿真时,程序报错错误代码sim211-100
  • ¥15 关于#hadoop#的问题
  • ¥15 (标签-Python|关键词-socket)
  • ¥15 keil里为什么main.c定义的函数在it.c调用不了
  • ¥50 切换TabTip键盘的输入法