drti52047 2016-08-21 23:55
浏览 17
已采纳

Golang:关闭具有周期性依赖性的渠道

I'm trying to implement a mapreduce-like method in Golang. My design is as follows:

  • Map workers pull items off a mapper input channel and output to a mapper output channel

  • The mapper output channel is then read by a single goroutine. This routine maintains a map of previously-seen key-value pairs. If the next item from the mapper output has a matching key, it sends both the new and old values with matching keys to a reduce-input channel.

  • The reduce-input pipeline reduces two values to one key-value pair, and submits the result to the same map-output channel.

This leads to a circular dependency between the mapper output and the reduce input, and I now do not know how to signal that the mapper output is complete (and close the channel).

What is the best way of breaking this cyclic dependency or knowing when to close channels with such cyclical behavior?

The code below has a deadlock with the map output channel and the reduce input channel waiting on each other.

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)
    go func() {
        for v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()
    for i := 0; i < nMappers; i++ {
        go func() {
            for v := range inputMapChan {
                k, v := mapFn(v)
                outputMapChan <- &kvPair{k, v}
            }
        }()
    }
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
            }
        }()
    }
    for v := range outputMapChan {
        key := v.k
        value := v.v
        other, ok := outputMapMap[key]
        if ok {
            delete(outputMapMap, key)
            reduceInputChan <- &reducePair{key, value, other}
        } else {
            outputMapMap[key] = value
        }
    }
    return outputMapMap, nil
}
  • 写回答

1条回答 默认 最新

  • doujia4041 2016-08-23 13:45
    关注

    Try this:

    package main
    
    import "fmt"
    import "sync"
    import "sync/atomic"
    import "runtime"
    import "math/rand"
    import "time"
    
    type MapFn func(input int) *kvPair
    type ReduceFn func(a int, b int) int
    
    type kvPair struct {
        k int
        v int
    }
    
    type reducePair struct {
        k  int
        v1 int
        v2 int
    }
    
    func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
        inputMapChan := make(chan int, len(input))
        outputMapChan := make(chan *kvPair, len(input))
        reduceInputChan := make(chan *reducePair)
        outputMapMap := make(map[int]int)
    
        wg := sync.WaitGroup{}
        wg.Add(1)
        go func() {
            defer wg.Done()
            for _, v := range input {
                inputMapChan <- v
            }
            close(inputMapChan)
        }()
    
        for i := 0; i < nMappers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for v := range inputMapChan {
                    outputMapChan <- mapFn(v)
                }
            }()
        }
    
        finished := false
        go func() {
            wg.Wait()
            finished = true
        }()
    
        var count int64
        for i := 0; i < nReducers; i++ {
            go func() {
                for v := range reduceInputChan {
                    reduceValue := reduceFn(v.v1, v.v2)
                    outputMapChan <- &kvPair{v.k, reduceValue}
                    atomic.AddInt64(&count, -1)
                }
            }()
        }
    
        wg2 := sync.WaitGroup{}
        wg2.Add(1)
        go func() {
            defer wg2.Done()
            for {
                select {
                default:
                    if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
                        return
                    }
                    //runtime.Gosched()
                case v := <-outputMapChan:
                    key := v.k
                    value := v.v
                    if other, ok := outputMapMap[key]; ok {
                        delete(outputMapMap, key)
                        atomic.AddInt64(&count, 1)
                        reduceInputChan <- &reducePair{key, value, other}
                    } else {
                        outputMapMap[key] = value
                    }
                }
            }
        }()
    
        wg2.Wait()
        return outputMapMap, nil
    }
    
    func main() {
        fmt.Println("NumCPU =", runtime.NumCPU())
        t := time.Now()
        a := rand.Perm(1000000)
        //a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
        m, err := MapReduce(mp, rdc, a, 2, 2)
        if err != nil {
            panic(err)
        }
        fmt.Println(time.Since(t)) //883ms
        fmt.Println(m)
        fmt.Println("done.")
    }
    
    func mp(input int) *kvPair {
        return &kvPair{input & 7, input >> 3}
    }
    func rdc(a int, b int) int {
        b <<= 3
        if a != 0 {
            b |= a
        }
        return b
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 为什么使用javacv转封装rtsp为rtmp时出现如下问题:[h264 @ 000000004faf7500]no frame?
  • ¥15 乘性高斯噪声在深度学习网络中的应用
  • ¥15 运筹学排序问题中的在线排序
  • ¥15 关于docker部署flink集成hadoop的yarn,请教个问题 flink启动yarn-session.sh连不上hadoop,这个整了好几天一直不行,求帮忙看一下怎么解决
  • ¥30 求一段fortran代码用IVF编译运行的结果
  • ¥15 深度学习根据CNN网络模型,搭建BP模型并训练MNIST数据集
  • ¥15 C++ 头文件/宏冲突问题解决
  • ¥15 用comsol模拟大气湍流通过底部加热(温度不同)的腔体
  • ¥50 安卓adb backup备份子用户应用数据失败
  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛