droe9376 2014-06-03 07:18
浏览 23
已采纳

在多个线程中运行一个函数

I have implemented a function contractGraph which calculates a minimal cut of a graph using randomized contraction. I am running it a specified number of times and calculating the minimum cut:

minCut := 0
for i := 0; i < totalCount; i++ {
    _minCut := contractGraph(graph)
    if minCut == 0 || _minCut < minCut {
        minCut = _minCut
    }
}

contractGraph does CPU intensive calculations, but the program uses only one CPU core on my machine. I want to modify it, so at any time 4 parallel executions of contractGraph happen, the results are put in channel and are read synchronously and the minimum is calculated.

I tried:

func worker(graph Graph, i int, workerChan <- chan bool, minCutChan chan <- int) {
    defer func () { <- workerChan }()
    min_cut := contractGraph(graph)
    minCutChan <- min_cut
}


func workerRunner(graph Graph, minCutChan chan int, totalCount int, workerCount int) {
    workerChan := make(chan bool, workerCount)
    for i := 0; i < totalCount; i++ {
        go worker(graph, i, workerChan, minCutChan)
    }
}

    minCutChan := make(chan int)
    go workerRunner(graph, minCutChan, totalCount, 4)

    // read the resulting min cuts
    minCut := 0
    for _minCut := range minCutChan {
        if minCut == 0 || _minCut < minCut {
            minCut = _minCut
        }
    }

But still only one core is used and I get at the end:

fatal error: all goroutines are asleep - deadlock!

Also I don't like having to channels, I think it should be possible to have only one channel with the results.

What pattern would you recommend to use?

  • 写回答

1条回答 默认 最新

  • douliang2935 2014-06-03 07:51
    关注

    You forgot to close the minCutChan so main is stuck into range and all the go routines have completed.

    to not use the channel you can use sync.WaitGroup

    EDIT: To handle the totalCount I would use atomic.AddInt64 see the new updated examples:

    see a working mock example with these edits: http://play.golang.org/p/WyCQrWK5aa

    package main
    
    import (
        "fmt"
        "sync"
        "sync/atomic"
    )
    
    type Graph struct {
    }
    
    func contractGraph(Graph) int { return 0 }
    
    func worker(wg *sync.WaitGroup, graph Graph, i int, minCutChan chan<- int) {
        defer wg.Done()
        for {
            count := atomic.AddInt64(&totalCount, -1) 
            if count < 0 {
                break
            }
            fmt.Println("Worker Iteration", count)
            min_cut := contractGraph(graph)
            minCutChan <- min_cut
        }
    }
    
    func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
        wg := new(sync.WaitGroup)
        wg.Add(workerCount)
        for i := 0; i < workerCount; i++ {
            go worker(wg, graph, i, minCutChan)
        }
        wg.Wait()
        close(minCutChan)
    }
    
    var totalCount int64
    
    func main() {
        workerCount := 4
        graph := Graph{}
        totalCount = 100
        minCutChan := make(chan int, workerCount+1)
        go workerRunner(graph, minCutChan, workerCount)
    
        go func() {
        }()
    
        // read the resulting min cuts
        minCut := 0
        for _minCut := range minCutChan {
            if minCut == 0 || _minCut < minCut {
                minCut = _minCut
            }
        }
        fmt.Println(minCut)
    }
    

    even more in go style is to spin the workers inside an anonymous function:

    http://play.golang.org/p/nT0uUutQyS

    package main

    import (
        "fmt"
        "sync"
        "sync/atomic"
    )
    
    type Graph struct {
    }
    
    func contractGraph(Graph) int { return 0 }
    
    var totalCount int64
    
    func workerRunner(graph Graph, minCutChan chan int, workerCount int) {
        var wg sync.WaitGroup
        wg.Add(workerCount)
        for i := 0; i < workerCount; i++ {
            go func() {
                defer wg.Done()
                for {
                    count := atomic.AddInt64(&totalCount, -1)
                    if count < 0 {
                        break
                    }
                    fmt.Println("Worker Iteration", count)
    
                    min_cut := contractGraph(graph)
                    minCutChan <- min_cut
                }
            }()
        }
        wg.Wait()
        close(minCutChan)
    }
    
    func main() {
        workerCount := 4
        totalCount = 100
        graph := Graph{}
        minCutChan := make(chan int, workerCount+1)
        go workerRunner(graph, minCutChan, workerCount)
    
        // read the resulting min cuts
        minCut := 0
        for _minCut := range minCutChan {
            if minCut == 0 || _minCut < minCut {
                minCut = _minCut
            }
        }
        fmt.Println(minCut)
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥50 安装pyaudiokits失败
  • ¥15 计组这些题应该咋做呀
  • ¥60 更换迈创SOL6M4AE卡的时候,驱动要重新装才能使用,怎么解决?
  • ¥15 让node服务器有自动加载文件的功能
  • ¥15 jmeter脚本回放有的是对的有的是错的
  • ¥15 r语言蛋白组学相关问题
  • ¥15 Python时间序列如何拟合疏系数模型
  • ¥15 求学软件的前人们指明方向🥺
  • ¥50 如何增强飞上天的树莓派的热点信号强度,以使得笔记本可以在地面实现远程桌面连接
  • ¥20 双层网络上信息-疾病传播