dtb81443 2018-07-25 13:59
浏览 37
已采纳

Golang并发,处理一批项目

I am writing a program to process millions of lines from a text file, 500k was taking 5seconds to validate the file, I wanted to speed this up.

I wanted to loop over the items and process x of them async, then wait for the response to see whether I should continue.

I have written some dummy code, I am not sure whether or not what I have written makes much sense, it just seems rather complicated, is there a simpler more elegant way of doing this.

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // Need an object to loop over
    // need a loop to read the response
    items := 100000
    concurrency := 20
    sem := make(chan bool, concurrency)
    returnChan := make(chan error)
    finChan := make(chan bool)

    var wg sync.WaitGroup

    go func() {
        for x := 0; x < items; x++ {
            // loop over all items
            // only do maxitems at a time
            wg.Add(1)
            sem <- true
            go delayFunc(x, sem, returnChan, &wg)
        }
        wg.Wait()
        finChan <- true
    }()

    var err error
    finished := false
    for {
        select {
        case err = <-returnChan:
            if err != nil {
                break
            }
        case _ = <-finChan:
            finished = true
            break
        default:
            continue
        }

        if err != nil || finished == true {
            break
        }
    }
    fmt.Println(err)
}

func delayFunc(x int, sem chan bool, returnChan chan error, wg *sync.WaitGroup) {
    //fmt.Printf("PROCESSING (%v)
", x)
    time.Sleep(10 * time.Millisecond)
    <-sem // release the lock
    wg.Done()
    if x == 95000 {
        returnChan <- fmt.Errorf("Something not right")
    } else {
        returnChan <- nil
    }
}
  • 写回答

1条回答 默认 最新

  • doupa1883 2018-07-25 16:05
    关注

    Your code looks fine, you implement commonly used in Go pattern. The downside is - you spawn worker goroutine for every item. Spawning goroutine while cheap isn't free. Another approach is to spawn N workers and provide them items thru channel. Something like this

    package main
    import (
        "fmt"
        "time"
    )
    
    func main() {
        items := 100
        concurrency := 10
        in := make(chan int)
        ret := make(chan error)
    
        for x := 0; x < concurrency; x++ {
            go worker(in, ret)
        }
        go func() {
            for x := 0; x < items; x++ {
                // loop over all items
                in <- x
            }
            close(in)
        }()
        for err := range ret {
            if err != nil {
                fmt.Println(err.Error())
                break
            }
        }
    }
    func worker(in chan int, returnChan chan error) {
        //fmt.Printf("PROCESSING (%v)
    ", x)
        for x := range in {
            if x == 95 {
                returnChan <- fmt.Errorf("Something not right")
            } else {
                returnChan <- nil
            }
            time.Sleep(10 * time.Millisecond)
        }
        returnChan <- fmt.Errorf("The End")
    }
    

    Playground

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?
  • ¥15 求daily translation(DT)偏差订正方法的代码
  • ¥15 js调用html页面需要隐藏某个按钮