drti52047 2016-08-21 23:55
浏览 17
已采纳

Golang:关闭具有周期性依赖性的渠道

I'm trying to implement a mapreduce-like method in Golang. My design is as follows:

  • Map workers pull items off a mapper input channel and output to a mapper output channel

  • The mapper output channel is then read by a single goroutine. This routine maintains a map of previously-seen key-value pairs. If the next item from the mapper output has a matching key, it sends both the new and old values with matching keys to a reduce-input channel.

  • The reduce-input pipeline reduces two values to one key-value pair, and submits the result to the same map-output channel.

This leads to a circular dependency between the mapper output and the reduce input, and I now do not know how to signal that the mapper output is complete (and close the channel).

What is the best way of breaking this cyclic dependency or knowing when to close channels with such cyclical behavior?

The code below has a deadlock with the map output channel and the reduce input channel waiting on each other.

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)
    go func() {
        for v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()
    for i := 0; i < nMappers; i++ {
        go func() {
            for v := range inputMapChan {
                k, v := mapFn(v)
                outputMapChan <- &kvPair{k, v}
            }
        }()
    }
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
            }
        }()
    }
    for v := range outputMapChan {
        key := v.k
        value := v.v
        other, ok := outputMapMap[key]
        if ok {
            delete(outputMapMap, key)
            reduceInputChan <- &reducePair{key, value, other}
        } else {
            outputMapMap[key] = value
        }
    }
    return outputMapMap, nil
}
  • 写回答

1条回答 默认 最新

  • doujia4041 2016-08-23 13:45
    关注

    Try this:

    package main
    
    import "fmt"
    import "sync"
    import "sync/atomic"
    import "runtime"
    import "math/rand"
    import "time"
    
    type MapFn func(input int) *kvPair
    type ReduceFn func(a int, b int) int
    
    type kvPair struct {
        k int
        v int
    }
    
    type reducePair struct {
        k  int
        v1 int
        v2 int
    }
    
    func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
        inputMapChan := make(chan int, len(input))
        outputMapChan := make(chan *kvPair, len(input))
        reduceInputChan := make(chan *reducePair)
        outputMapMap := make(map[int]int)
    
        wg := sync.WaitGroup{}
        wg.Add(1)
        go func() {
            defer wg.Done()
            for _, v := range input {
                inputMapChan <- v
            }
            close(inputMapChan)
        }()
    
        for i := 0; i < nMappers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for v := range inputMapChan {
                    outputMapChan <- mapFn(v)
                }
            }()
        }
    
        finished := false
        go func() {
            wg.Wait()
            finished = true
        }()
    
        var count int64
        for i := 0; i < nReducers; i++ {
            go func() {
                for v := range reduceInputChan {
                    reduceValue := reduceFn(v.v1, v.v2)
                    outputMapChan <- &kvPair{v.k, reduceValue}
                    atomic.AddInt64(&count, -1)
                }
            }()
        }
    
        wg2 := sync.WaitGroup{}
        wg2.Add(1)
        go func() {
            defer wg2.Done()
            for {
                select {
                default:
                    if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
                        return
                    }
                    //runtime.Gosched()
                case v := <-outputMapChan:
                    key := v.k
                    value := v.v
                    if other, ok := outputMapMap[key]; ok {
                        delete(outputMapMap, key)
                        atomic.AddInt64(&count, 1)
                        reduceInputChan <- &reducePair{key, value, other}
                    } else {
                        outputMapMap[key] = value
                    }
                }
            }
        }()
    
        wg2.Wait()
        return outputMapMap, nil
    }
    
    func main() {
        fmt.Println("NumCPU =", runtime.NumCPU())
        t := time.Now()
        a := rand.Perm(1000000)
        //a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
        m, err := MapReduce(mp, rdc, a, 2, 2)
        if err != nil {
            panic(err)
        }
        fmt.Println(time.Since(t)) //883ms
        fmt.Println(m)
        fmt.Println("done.")
    }
    
    func mp(input int) *kvPair {
        return &kvPair{input & 7, input >> 3}
    }
    func rdc(a int, b int) int {
        b <<= 3
        if a != 0 {
            b |= a
        }
        return b
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)