I'm struggling to figure out a working design that would mix together synchronous flow with asynchronous behavior.
I've 4 components:
- Seeder
- Worker
- Publisher
- Updater
The only limitation I've is that once Seeder seeds data it must be blocked up until Updater is not fully finished with processing all tasks. The first 3 components could easily be synchronous but the Updater must work in parallel or it would take forever to finish the tasks.
So the flow is:
Seeder -> Worker -> Publisher -> Updater --> Seeder -> Worker -> Publisher -> Updater ...
and this flow must rotate forever.
The seeding and updating is towards a database. Unfortunately this particular database doesn't allow for a different design.
The best I got to is using sync.WaitGroup
to sync the Updater goroutines and leave everything else in a synchronous state. The data to the Updater are provided through a channel.
Here is a simplified code (no errors, not much logic in).
func main() {
var wg sync.WaitGroup
c := make(chan Result, 100)
for {
data := Seeder()
msgs := Worker(data)
results := Publisher(msgs)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(){
defer wg.Done()
data := <- c
// this is the updater
}(&wg)
}
for _, result := range results {
c <- result
}
wg.Wait()
}
}
The result is that the code works up until it halts at some cycle and never moves forward. I've played with many variables, loading 100 rows instead of 10k and the result is not much different.
I also tried to pass a struct containing channels and run everything asynchronously but I've even harder time figuring out when Updater is finished so I can unblock the seeder.
Any pointers are appreciated.