I would need help to understand why the following code does not work. I am building a pipeline and trying to have a step that synchronize values from two source channels. My source/producer code looks something like below (in my real code i read the text from a file). The sources are sorted but are values are not guaranteed to be in both sources.
func Source() <-chan int{
out := make(chan int, 5)
go func() {
defer reader.Close()
out <- 1
out <- 2
out <- 3
out <- 4
out <- 5
out <- 7
close(out)
}()
return out
}
and the synchronization code looks like this:
func Sync(a, b <-chan int) <-chan int {
out := make(chan int)
go func() {
av, ak:= <-a
bv, bk:= <-b
for ak || bk {
if !ak || av < bv {
out <- bv
bv, bk = <-b
continue
}
if !bk|| bv > av {
out <- av
av, ak = <-a
continue
}
out <- av
av, ak = <-a
bv, bk = <-b
}
close(out)
}()
return out
}
and my program looks something like this:
func main() {
os := Source()
ns := Source()
for val := range Sync(ns, os) {
fmt.Printf("[SYNCED] %v
", val)
}
}
The expected behaviour is that my both sources buffer values into the channel and my sync first reads value from the first source. Then from the second. Compare them and if they are equal continues to the next in the both channels. If the differ we will send out the value that is behind and replace it with a new one and make the same comparison again.
What happends is that it looks like the sync code is run several times for the values and I will get things like [SYNCED] 1 several times. Why?
Please help me get this fixed!