dqby43944 2015-04-23 15:26
浏览 54

“扇入”-一种“扇出”行为

Say, we have three methods to implement "fan in" behavior

func MakeChannel(tries int) chan int {
    ch := make(chan int)

    go func() {
        for i := 0; i < tries; i++ {
            ch <- i
        }
        close(ch)
    }()

    return ch
}

func MergeByReflection(channels ...chan int) chan int {
    length := len(channels)
    out := make(chan int)
    cases := make([]reflect.SelectCase, length)
    for i, ch := range channels {
        cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
    }
    go func() {
        for length > 0 {
            i, line, opened := reflect.Select(cases)
            if !opened {
                cases[i].Chan = reflect.ValueOf(nil)
                length -= 1
            } else {
                out <- int(line.Int())
            }
        }
        close(out)
    }()
    return out
}

func MergeByCode(channels ...chan int) chan int {
    length := len(channels)
    out := make(chan int)
    go func() {
        var i int
        var ok bool

        for length > 0 {
            select {
            case i, ok = <-channels[0]:
                out <- i
                if !ok {
                    channels[0] = nil
                    length -= 1
                }
            case i, ok = <-channels[1]:
                out <- i
                if !ok {
                    channels[1] = nil
                    length -= 1
                }
            case i, ok = <-channels[2]:
                out <- i
                if !ok {
                    channels[2] = nil
                    length -= 1
                }
            case i, ok = <-channels[3]:
                out <- i
                if !ok {
                    channels[3] = nil
                    length -= 1
                }
            case i, ok = <-channels[4]:
                out <- i
                if !ok {
                    channels[4] = nil
                    length -= 1
                }
            }
        }
        close(out)
    }()
    return out
}

func MergeByGoRoutines(channels ...chan int) chan int {
    var group sync.WaitGroup

    out := make(chan int)
    for _, ch := range channels {
        go func(ch chan int) {
            for i := range ch {
                out <- i
            }
            group.Done()
        }(ch)
    }
    group.Add(len(channels))
    go func() {
        group.Wait()
        close(out)
    }()
    return out
}

type MergeFn func(...chan int) chan int

func main() {
    length := 5
    tries := 1000000
    channels := make([]chan int, length)
    fns := []MergeFn{MergeByReflection, MergeByCode, MergeByGoRoutines}

    for _, fn := range fns {
        sum := 0
        t := time.Now()
        for i := 0; i < length; i++ {
            channels[i] = MakeChannel(tries)
        }
        for i := range fn(channels...) {
            sum += i
        }
        fmt.Println(time.Since(t))
        fmt.Println(sum)
    }
}

Results are (at 1 CPU, I have used runtime.GOMAXPROCS(1)):
19.869s (MergeByReflection)
2499997500000
8.483s (MergeByCode)
2499997500000
4.977s (MergeByGoRoutines)
2499997500000

Results are (at 2 CPU, I have used runtime.GOMAXPROCS(2)):
44.94s
2499997500000
10.853s
2499997500000
3.728s
2499997500000

  • I understand the reason why MergeByReflection is slowest, but what is about the difference between MergeByCode and MergeByGoRoutines?
  • And when we increase the CPU number why "select" clause (used MergeByReflection directly and in MergeByCode indirectly) becomes slower?
  • 写回答

1条回答 默认 最新

  • dongwei6700 2015-04-23 18:12
    关注

    Here is a preliminary remark. The channels in your examples are all unbuffered, meaning they will likely block at put or get time.

    In this example, there is almost no processing except channel management. The performance is therefore dominated by synchronization primitives. Actually, there is very little of this code that can be parallelized.

    In the MergeByReflection and MergeByCode functions, select is used to listen to multiple input channels, but nothing is done to take in account the output channel (which may therefore block, while some event could be available on one of the input channels).

    In the MergeByGoRoutines function, this situation cannot happen: when the output channel blocks, it does not prevent an other input channel to be read by another goroutine. There are therefore better opportunities for the runtime to parallelize the goroutines, and less contention on the input channels.

    The MergeByReflection code is the slowest because it has the overhead of reflection, and almost nothing can be parallelized.

    The MergeByGoRoutines function is the fastest because it reduces the contention (less synchronization is needed), and because output contention has a lesser impact on the input performance. It can therefore benefit of a small improvement when running with multiple cores (contrary to the two other methods).

    There is so much synchronization activity with MergeByReflection and MergeByCode, that running on multiple cores negatively impacts the performance. You could have different performance by using buffered channels though.

    评论

报告相同问题?

悬赏问题

  • ¥20 完全没有学习过GAN,看了CSDN的一篇文章,里面有代码但是完全不知道如何操作
  • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
  • ¥20 软件测试决策法疑问求解答
  • ¥15 win11 23H2删除推荐的项目,支持注册表等
  • ¥15 matlab 用yalmip搭建模型,cplex求解,线性化处理的方法
  • ¥15 qt6.6.3 基于百度云的语音识别 不会改
  • ¥15 关于#目标检测#的问题:大概就是类似后台自动检测某下架商品的库存,在他监测到该商品上架并且可以购买的瞬间点击立即购买下单
  • ¥15 神经网络怎么把隐含层变量融合到损失函数中?
  • ¥15 lingo18勾选global solver求解使用的算法
  • ¥15 全部备份安卓app数据包括密码,可以复制到另一手机上运行