douxiong0668 2013-06-13 23:12 采纳率: 100%
浏览 51
已采纳

N> 1 goroutines的结果不同(在N> 1 Cpu:s上)。 为什么?

I have a test program that gives different results when executing more than one goroutine on more than one Cpu (Goroutines = Cpus). The "test" is about syncing goroutines using channels, and the program itself counts occurences of chars in strings. It produces consistent results on one Cpu / one goroutine.

See code example on playground (Note: Run on local machine to execute on multi core, and watch the resulting numbers vary): http://play.golang.org/p/PT5jeCKgBv .

Code summary: The program counts occurences of 4 different chars (A,T, G,C) in (DNA) strings.

Problem: Result (n occurences of chars) varies when executed on multiple Cpu's (goroutines). Why?

Description:

  1. A goroutine spawns work (SpawnWork) as strings to Workers. Sets up artificial string input data (hardcoded strings are copied n times).
  2. Goroutine Workers (Worker) are created equalling the numbers of Cpu's.
  3. Workers checks each char in string and counts A,T's and sends the sum into a channel, and G,C counts to another channel.
  4. SpawnWork closes workstring channel as to control Workers (which consumes strings using range, which quits when the input channel is closed by SpawnWork).
  5. When Workers has consumed its ranges (of chars) it sends a quit signal on the quit channel (quit <- true). These "pulses" will occure Cpu number of times ( Cpu count = goroutines count).
  6. Main (select) loop will quit when it has received Cpu-count number of quit signals.
  7. Main func prints a summary of occurences of Chars (A,T's, G,C's).

Simplified code:

1. "Worker" (goroutines) counting chars in lines:

func Worker(inCh chan *[]byte, resA chan<- *int, resB chan<- *int, quit chan bool) {
    //for p_ch := range inCh {
    for {
        p_ch, ok := <-inCh // similar to range
        if ok {
            ch := *p_ch
            for i := 0; i < len(ch); i++ {
                if ch[i] == 'A' || ch[i] == 'T' {        // Count A:s and T:s
                    at++
                } else if ch[i] == 'G' || ch[i] == 'C' { // Count G:s and C:s
                    gc++
                }
            }
            resA <- &at  // Send line results on separate channels
            resB <- &gc  // Send line results on separate channels
        } else {
            quit <- true // Indicate that we're all done
            break
        }
    }
}

2. Spawn work (strings) to workers:

func SpawnWork(inStr chan<- *[]byte, quit chan bool) {
    // Artificial input data
    StringData :=
        "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN
" +
        "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA
" +
        "... etc
" +
    // ...
    for scanner.Scan() {
        s := scanner.Bytes()
        if len(s) == 0 || s[0] == '>' {
            continue
        } else {
            i++
            inStr <- &s
        }
    }
    close(inStr) // Indicate (to Workers) that there's no more strings coming.
}

3. Main routine:

func main() {
    // Count Cpus, and count down in final select clause
    CpuCnt := runtime.NumCPU() 
    runtime.GOMAXPROCS(CpuCnt)
    // Make channels
    resChA := make(chan *int)
    resChB := make(chan *int)
    quit := make(chan bool)
    inStr := make(chan *[]byte)

    // Set up Workers ( n = Cpu )
    for i := 0; i < CpuCnt; i++ {
        go Worker(inStr, resChA, resChB, quit)
    }
    // Send lines to Workers
    go SpawnWork(inStr, quit)

    // Count the number of "A","T" & "G","C" per line 
    // (comes in here as ints per row, on separate channels (at and gt))
    for {
        select {
        case tmp_at := <-resChA:
            tmp_gc := <-resChB // Ch A and B go in pairs anyway
            A += *tmp_at       // sum of A's and T's
            B += *tmp_gc       // sum of G's and C's
        case <-quit:
            // Each goroutine sends "quit" signals when it's done. Since 
            // the number of goroutines equals the Cpu counter, we count 
            // down each time a goroutine tells us it's done (quit at 0):
            CpuCnt--
            if CpuCnt == 0 { // When all goroutines are done then we're done.
                goto out     
            }
        }
    }
out:
    // Print report to screen
}

Why does this code count consistently only when executed on a singel cpu/goroutine? That is, the channels doesn't seem to sync, or the main loop quits forcefully before all goroutines are done? Scratching head.

(Again: See/run the full code at the playground: http://play.golang.org/p/PT5jeCKgBv )

// Rolf Lampa

  • 写回答

1条回答 默认 最新

  • douping3891 2013-06-14 07:41
    关注

    Here is a working version which consistently produces the same results no matter how many cpus are used.

    Here is what I did

    • remove passing of *int - very racy to pass in a channel!
    • remove passing of *[]byte - pointless as slices are reference types anyway
    • copy the slice before putting it in the channel - the slice points to the same memory causing a race
    • fix initialisation of at and gc in Worker - they were in the wrong place - this was the major cause of the difference in results
    • use sync.WaitGroup for synchronisation and channel close()

    I used the -race parameter of go build to find and fix the data races.

    package main
    
    import (
        "bufio"
        "fmt"
        "runtime"
        "strings"
        "sync"
    )
    
    func Worker(inCh chan []byte, resA chan<- int, resB chan<- int, wg *sync.WaitGroup) {
        defer wg.Done()
        fmt.Println("Worker started...")
        for ch := range inCh {
            at := 0
            gc := 0
            for i := 0; i < len(ch); i++ {
                if ch[i] == 'A' || ch[i] == 'T' {
                    at++
                } else if ch[i] == 'G' || ch[i] == 'C' {
                    gc++
                }
            }
            resA <- at
            resB <- gc
        }
    
    }
    
    func SpawnWork(inStr chan<- []byte) {
        fmt.Println("Spawning work:")
        // An artificial input source.
        StringData :=
            "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN
    " +
                "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA
    " +
                "CTTCCCAATTGGATTAGACTATTAACATTTCAGAAAGGATGTAAGAAAGGACTAGAGAGA
    " +
                "TATACTTAATGTTTTTAGTTTTTTAAACTTTACAAACTTAATACTGTCATTCTGTTGTTC
    " +
                "AGTTAACATCCCTGAATCCTAAATTTCTTCAGATTCTAAAACAAAAAGTTCCAGATGATT
    " +
                "TTATATTACACTATTTACTTAATGGTACTTAAATCCTCATTNNNNNNNNCAGTACGGTTG
    " +
                "TTAAATANNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN
    " +
                "NNNNNNNCTTCAGAAATAAGTATACTGCAATCTGATTCCGGGAAATATTTAGGTTCATAA
    "
        // Expand data n times
        tmp := StringData
        for n := 0; n < 1000; n++ {
            StringData = StringData + tmp
        }
        scanner := bufio.NewScanner(strings.NewReader(StringData))
        scanner.Split(bufio.ScanLines)
    
        var i int
        for scanner.Scan() {
            s := scanner.Bytes()
            if len(s) == 0 || s[0] == '>' {
                continue
            } else {
                i++
                s_copy := append([]byte(nil), s...)
                inStr <- s_copy
            }
        }
        close(inStr)
    }
    
    func main() {
        CpuCnt := runtime.NumCPU() // Count down in select clause
        CpuOut := CpuCnt           // Save for print report
        runtime.GOMAXPROCS(CpuCnt)
        fmt.Printf("Processors: %d
    ", CpuCnt)
    
        resChA := make(chan int)
        resChB := make(chan int)
        inStr := make(chan []byte)
    
        fmt.Println("Spawning workers:")
        var wg sync.WaitGroup
        for i := 0; i < CpuCnt; i++ {
            wg.Add(1)
            go Worker(inStr, resChA, resChB, &wg)
        }
        fmt.Println("Spawning work:")
        go func() {
            SpawnWork(inStr)
            wg.Wait()
            close(resChA)
            close(resChB)
        }()
    
        A := 0
        B := 0
        LineCnt := 0
        for tmp_at := range resChA {
            tmp_gc := <-resChB // Theese go together anyway
            A += tmp_at
            B += tmp_gc
            LineCnt++
        }
    
        if !(A+B > 0) {
            fmt.Println("No A/B was found!")
        } else {
            ABFraction := float32(B) / float32(A+B)
            fmt.Println("
    ----------------------------")
            fmt.Printf("Cpu's  : %d
    ", CpuOut)
            fmt.Printf("Lines  : %d
    ", LineCnt)
            fmt.Printf("A+B    : %d
    ", A+B)
            fmt.Printf("A      : %d
    ", A)
            fmt.Printf("B      : %d
    ", A)
            fmt.Printf("AB frac: %v
    ", ABFraction*100)
            fmt.Println("----------------------------")
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 matlab 用yalmip搭建模型,cplex求解,线性化处理的方法
  • ¥15 qt6.6.3 基于百度云的语音识别 不会改
  • ¥15 关于#目标检测#的问题:大概就是类似后台自动检测某下架商品的库存,在他监测到该商品上架并且可以购买的瞬间点击立即购买下单
  • ¥15 神经网络怎么把隐含层变量融合到损失函数中?
  • ¥15 lingo18勾选global solver求解使用的算法
  • ¥15 全部备份安卓app数据包括密码,可以复制到另一手机上运行
  • ¥20 测距传感器数据手册i2c
  • ¥15 RPA正常跑,cmd输入cookies跑不出来
  • ¥15 求帮我调试一下freefem代码
  • ¥15 matlab代码解决,怎么运行