drti52047
drti52047
2016-08-21 23:55

Golang:关闭具有周期性依赖性的渠道

  • channel
  • mapreduce
  • concurrency
已采纳

I'm trying to implement a mapreduce-like method in Golang. My design is as follows:

  • Map workers pull items off a mapper input channel and output to a mapper output channel

  • The mapper output channel is then read by a single goroutine. This routine maintains a map of previously-seen key-value pairs. If the next item from the mapper output has a matching key, it sends both the new and old values with matching keys to a reduce-input channel.

  • The reduce-input pipeline reduces two values to one key-value pair, and submits the result to the same map-output channel.

This leads to a circular dependency between the mapper output and the reduce input, and I now do not know how to signal that the mapper output is complete (and close the channel).

What is the best way of breaking this cyclic dependency or knowing when to close channels with such cyclical behavior?

The code below has a deadlock with the map output channel and the reduce input channel waiting on each other.

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)
    go func() {
        for v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()
    for i := 0; i < nMappers; i++ {
        go func() {
            for v := range inputMapChan {
                k, v := mapFn(v)
                outputMapChan <- &kvPair{k, v}
            }
        }()
    }
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
            }
        }()
    }
    for v := range outputMapChan {
        key := v.k
        value := v.v
        other, ok := outputMapMap[key]
        if ok {
            delete(outputMapMap, key)
            reduceInputChan <- &reducePair{key, value, other}
        } else {
            outputMapMap[key] = value
        }
    }
    return outputMapMap, nil
}
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

1条回答

  • doujia4041 doujia4041 5年前

    Try this:

    package main
    
    import "fmt"
    import "sync"
    import "sync/atomic"
    import "runtime"
    import "math/rand"
    import "time"
    
    type MapFn func(input int) *kvPair
    type ReduceFn func(a int, b int) int
    
    type kvPair struct {
        k int
        v int
    }
    
    type reducePair struct {
        k  int
        v1 int
        v2 int
    }
    
    func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
        inputMapChan := make(chan int, len(input))
        outputMapChan := make(chan *kvPair, len(input))
        reduceInputChan := make(chan *reducePair)
        outputMapMap := make(map[int]int)
    
        wg := sync.WaitGroup{}
        wg.Add(1)
        go func() {
            defer wg.Done()
            for _, v := range input {
                inputMapChan <- v
            }
            close(inputMapChan)
        }()
    
        for i := 0; i < nMappers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for v := range inputMapChan {
                    outputMapChan <- mapFn(v)
                }
            }()
        }
    
        finished := false
        go func() {
            wg.Wait()
            finished = true
        }()
    
        var count int64
        for i := 0; i < nReducers; i++ {
            go func() {
                for v := range reduceInputChan {
                    reduceValue := reduceFn(v.v1, v.v2)
                    outputMapChan <- &kvPair{v.k, reduceValue}
                    atomic.AddInt64(&count, -1)
                }
            }()
        }
    
        wg2 := sync.WaitGroup{}
        wg2.Add(1)
        go func() {
            defer wg2.Done()
            for {
                select {
                default:
                    if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
                        return
                    }
                    //runtime.Gosched()
                case v := <-outputMapChan:
                    key := v.k
                    value := v.v
                    if other, ok := outputMapMap[key]; ok {
                        delete(outputMapMap, key)
                        atomic.AddInt64(&count, 1)
                        reduceInputChan <- &reducePair{key, value, other}
                    } else {
                        outputMapMap[key] = value
                    }
                }
            }
        }()
    
        wg2.Wait()
        return outputMapMap, nil
    }
    
    func main() {
        fmt.Println("NumCPU =", runtime.NumCPU())
        t := time.Now()
        a := rand.Perm(1000000)
        //a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
        m, err := MapReduce(mp, rdc, a, 2, 2)
        if err != nil {
            panic(err)
        }
        fmt.Println(time.Since(t)) //883ms
        fmt.Println(m)
        fmt.Println("done.")
    }
    
    func mp(input int) *kvPair {
        return &kvPair{input & 7, input >> 3}
    }
    func rdc(a int, b int) int {
        b <<= 3
        if a != 0 {
            b |= a
        }
        return b
    }
    
    点赞 评论 复制链接分享

为你推荐