I'm trying to implement a mapreduce-like method in Golang. My design is as follows:
Map workers pull items off a mapper input channel and output to a mapper output channel
The mapper output channel is then read by a single goroutine. This routine maintains a map of previously-seen key-value pairs. If the next item from the mapper output has a matching key, it sends both the new and old values with matching keys to a reduce-input channel.
- The reduce-input pipeline reduces two values to one key-value pair, and submits the result to the same map-output channel.
This leads to a circular dependency between the mapper output and the reduce input, and I now do not know how to signal that the mapper output is complete (and close the channel).
What is the best way of breaking this cyclic dependency or knowing when to close channels with such cyclical behavior?
The code below has a deadlock with the map output channel and the reduce input channel waiting on each other.
type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int
type kvPair struct {
k int
v int
}
type reducePair struct {
k int
v1 int
v2 int
}
func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
inputMapChan := make(chan int, len(input))
outputMapChan := make(chan *kvPair, len(input))
reduceInputChan := make(chan *reducePair)
outputMapMap := make(map[int]int)
go func() {
for v := range input {
inputMapChan <- v
}
close(inputMapChan)
}()
for i := 0; i < nMappers; i++ {
go func() {
for v := range inputMapChan {
k, v := mapFn(v)
outputMapChan <- &kvPair{k, v}
}
}()
}
for i := 0; i < nReducers; i++ {
go func() {
for v := range reduceInputChan {
reduceValue := reduceFn(v.v1, v.v2)
outputMapChan <- &kvPair{v.k, reduceValue}
}
}()
}
for v := range outputMapChan {
key := v.k
value := v.v
other, ok := outputMapMap[key]
if ok {
delete(outputMapMap, key)
reduceInputChan <- &reducePair{key, value, other}
} else {
outputMapMap[key] = value
}
}
return outputMapMap, nil
}