doujing1858 2018-03-05 05:56
浏览 39
已采纳

如何正确实现并发的goroutine(和/或限制它们)以产生一致的结果?

I'm using this: (symbols is []string as well as filteredSymbols)

concurrency := 5
sem := make(chan bool, concurrency)

for i := range symbols {
    sem <- true
    go func(int) {
        defer func() { <-sem }()
        rows, err := stmt.Query(symbols[i])
        if <some condition is true> {
            filteredSymbols = append(filteredSymbols, symbols[i])
        }
    }(i)
}
for i := 0; i < cap(sem); i++ {
    sem <- true
}

to limit number of goroutines running concurrently. I need to limit it because every goroutine interacts with Postgres database and sometimes I do have more than 3000 symbols to evaluate. The code is for analysing big financial data, stocks and other securities. I'm also using same code to get OHLC and pre-calculated data from db. Is this a modern approach for this? I'm asking this because WaitGroups already exist and I'm looking for a way to use those instead.

Also, I observed that my method above sometimes yield different results. I had a code where sometimes the resulting number of filteredSymbols is 1409. Without changing the parameters, it would then yield 1407 results, then 1408 at times. I even had a code where there were big deficit in results.

The code below was very inconsistent so I removed the concurrency. (NOTE that in code below, I don't even have to limit concurrent goroutines since they only use in-memory resources). Removing concurrency fixed it

func getCommonSymbols(symbols1 []string, symbols2 []string) (symbols []string) {
    defer timeTrack(time.Now(), "Get common symbols")
    // concurrency := len(symbols1)
    // sem := make(chan bool, concurrency)

    // for _, s := range symbols1 {
    for _, sym := range symbols1 {
        // sym := s
        // sem <- true
        // go func(string) {
        // defer func() { <-sem }()
        for k := range symbols2 {
            if sym == symbols2[k] {
                symbols = append(symbols, sym)
                break
            }
        }
        // }(sym)
    }
    // for i := 0; i < cap(sem); i++ {
    //  sem <- true
    // }
    return
}
  • 写回答

1条回答 默认 最新

  • duanquan1876 2018-03-06 04:36
    关注

    You have a data race, multiple goroutines are updating filteredSymbols at the same time. The smallest change you can make to fix it is to add a mutex lock around the append call, e.g.

    concurrency := 5
    sem := make(chan bool, concurrency)
    l := sync.Mutex{}
    for i := range symbols {
        sem <- true
        go func(int) {
            defer func() { <-sem }()
            rows, err := stmt.Query(symbols[i])
            if <some condition is true> {
                l.Lock()
                filteredSymbols = append(filteredSymbols, symbols[i])
                l.Unlock()
            }
        }(i)
    }
    for i := 0; i < cap(sem); i++ {
        sem <- true
    }
    

    The Race Detector could of helped you spot this as well. One common alternative would be to use a channel to get work into a goroutine, and a channel to get the results out, something like.

    concurrency := 5
    workCh := make(chan string, concurrency)
    resCh := make(chan string, concurrency)
    workersWg := sync.WaitGroup{}
    // start the required number of workers, use the WaitGroup to see when they're done
    for i := 0; i < concurrency; i++ {
       workersWg.Add(1)
       go func() {
         defer workersWg.Done()
         for symbol := range workCh {
              // do some work
              if cond {
                  resCh <- symbol
              }
         }
       }()
    }
    go func() {
        // when all the workers are done, close the resultsCh
        workersWg.Wait()
        close(resCh)
    }()
    // submit all the work
    for _, s := range symbols {
        workCh <- s
    }
    close(workCh)
    // collect up the results 
    for r := range resCh {
        filteredSymbols = append(filteredSymbols, r)
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 win10权限管理,限制普通用户使用删除功能
  • ¥15 minnio内存占用过大,内存没被回收(Windows环境)
  • ¥65 抖音咸鱼付款链接转码支付宝
  • ¥15 ubuntu22.04上安装ursim-3.15.8.106339遇到的问题
  • ¥15 求螺旋焊缝的图像处理
  • ¥15 blast算法(相关搜索:数据库)
  • ¥15 请问有人会紧聚焦相关的matlab知识嘛?
  • ¥15 网络通信安全解决方案
  • ¥50 yalmip+Gurobi
  • ¥20 win10修改放大文本以及缩放与布局后蓝屏无法正常进入桌面