doulin8374 2019-05-27 11:46
浏览 52
已采纳

处理大型csv文件并限制goroutines

I'm trying to find the best efficient way to read a csv file (~1M row). Each row contain a HTTP link to an image which I need to download.

This is my current code using worker pools:

func worker(queue chan []string, worknumber int, done, ks chan bool) {
    for true {
        select {
        case url := <-queue:
            fmt.Println("doing work!", url, "worknumber", worknumber)
            processData(url) // HTTP download
            done <- true
        case <-ks:
            fmt.Println("worker halted, number", worknumber)
            return
        }
    }
}

func main() {
    start := time.Now()
    flag.Parse()
    fmt.Print(strings.Join(flag.Args(), "
"))
    if *filename == "REQUIRED" {
        return
    }

    csvfile, err := os.Open(*filename)
    if err != nil {
        fmt.Println(err)
        return
    }
    count, _ := lineCounter(csvfile)
    fmt.Printf("Total count: %d
", count)
    csvfile.Seek(0, 0)

    defer csvfile.Close()

    //bar := pb.StartNew(count)
    bar := progressbar.NewOptions(count)
    bar.RenderBlank()

    reader := csv.NewReader(csvfile)

    //channel for terminating the workers
    killsignal := make(chan bool)

    //queue of jobs
    q := make(chan []string)
    // done channel takes the result of the job
    done := make(chan bool)

    numberOfWorkers := *numChannels
    for i := 0; i < numberOfWorkers; i++ {
        go worker(q, i, done, killsignal)
    }

    i := 0
    for {
        record, err := reader.Read()
        if err == io.EOF {
            break
        } else if err != nil {
            fmt.Println(err)
            return
        }
        i++

        go func(r []string, i int) {
            q <- r
            bar.Add(1)
        }(record, i)
    }

    // a deadlock occurs if c >= numberOfJobs
    for c := 0; c < count; c++ {
        <-done
    }

    fmt.Println("finished")

    // cleaning workers
    close(killsignal)
    time.Sleep(2 * time.Second)

    fmt.Printf("
%2fs", time.Since(start).Seconds())
}

My issue here is that it opens a lot of goroutines, use all the memory and crash.

What would be the best way to limit it?

  • 写回答

3条回答 默认 最新

  • duanlun2827 2019-05-27 14:51
    关注

    I striped out the progress bar as i did not want to bother about it, but overall this is closer to what you are looking for.

    It does not genuinely handle errors, they simply fail in a fatal state.

    I have added context and cancellation support.

    You might want to check for https://godoc.org/golang.org/x/sync/errgroup#Group.Go

    As a general recommentation, you need to learn the golang patterns and their usage.

    It is obvious you have not worked that enough, or that you are in process of learning.

    Its not the fastest program at all, but it does the job.

    This is only a draft to get you back on a better direction.

    package main
    
    import (
        "context"
        "encoding/csv"
        "flag"
        "fmt"
        "io"
        "log"
        "os"
        "os/signal"
        "sync"
        "time"
    )
    
    func worker(ctx context.Context, dst chan string, src chan []string) {
        for {
            select {
            case url, ok := <-src: // you must check for readable state of the channel.
                if !ok {
                    return
                }
                dst <- fmt.Sprintf("out of %v", url) // do somethingg useful.
            case <-ctx.Done(): // if the context is cancelled, quit.
                return
            }
        }
    }
    
    func main() {
    
        // create a context
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
        // that cancels at ctrl+C
        go onSignal(os.Interrupt, cancel)
    
        // parse command line arguments
        var filename string
        var numberOfWorkers int
        flag.StringVar(&filename, "filename", "", "src file")
        flag.IntVar(&numberOfWorkers, "c", 2, "concurrent workers")
        flag.Parse()
    
        // check arguments
        if filename == "" {
            log.Fatal("filename required")
        }
    
        start := time.Now()
    
        csvfile, err := os.Open(filename)
        if err != nil {
            log.Fatal(err)
        }
        defer csvfile.Close()
    
        reader := csv.NewReader(csvfile)
    
        // create the pair of input/output channels for the controller=>workers com.
        src := make(chan []string)
        out := make(chan string)
    
        // use a waitgroup to manage synchronization
        var wg sync.WaitGroup
    
        // declare the workers
        for i := 0; i < numberOfWorkers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                worker(ctx, out, src)
            }()
        }
    
        // read the csv and write it to src
        go func() {
            for {
                record, err := reader.Read()
                if err == io.EOF {
                    break
                } else if err != nil {
                    log.Fatal(err)
                }
                src <- record // you might select on ctx.Done().
            }
            close(src) // close src to signal workers that no more job are incoming.
        }()
    
        // wait for worker group to finish and close out
        go func() {
            wg.Wait() // wait for writers to quit.
            close(out) // when you close(out) it breaks the below loop.
        }()
    
        // drain the output
        for res := range out {
            fmt.Println(res)
        }
    
        fmt.Printf("
    %2fs", time.Since(start).Seconds())
    }
    
    func onSignal(s os.Signal, h func()) {
        c := make(chan os.Signal, 1)
        signal.Notify(c, s)
        <-c
        h()
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 对于相关问题的求解与代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料