doulin8374 2019-05-27 11:46
浏览 52
已采纳

处理大型csv文件并限制goroutines

I'm trying to find the best efficient way to read a csv file (~1M row). Each row contain a HTTP link to an image which I need to download.

This is my current code using worker pools:

func worker(queue chan []string, worknumber int, done, ks chan bool) {
    for true {
        select {
        case url := <-queue:
            fmt.Println("doing work!", url, "worknumber", worknumber)
            processData(url) // HTTP download
            done <- true
        case <-ks:
            fmt.Println("worker halted, number", worknumber)
            return
        }
    }
}

func main() {
    start := time.Now()
    flag.Parse()
    fmt.Print(strings.Join(flag.Args(), "
"))
    if *filename == "REQUIRED" {
        return
    }

    csvfile, err := os.Open(*filename)
    if err != nil {
        fmt.Println(err)
        return
    }
    count, _ := lineCounter(csvfile)
    fmt.Printf("Total count: %d
", count)
    csvfile.Seek(0, 0)

    defer csvfile.Close()

    //bar := pb.StartNew(count)
    bar := progressbar.NewOptions(count)
    bar.RenderBlank()

    reader := csv.NewReader(csvfile)

    //channel for terminating the workers
    killsignal := make(chan bool)

    //queue of jobs
    q := make(chan []string)
    // done channel takes the result of the job
    done := make(chan bool)

    numberOfWorkers := *numChannels
    for i := 0; i < numberOfWorkers; i++ {
        go worker(q, i, done, killsignal)
    }

    i := 0
    for {
        record, err := reader.Read()
        if err == io.EOF {
            break
        } else if err != nil {
            fmt.Println(err)
            return
        }
        i++

        go func(r []string, i int) {
            q <- r
            bar.Add(1)
        }(record, i)
    }

    // a deadlock occurs if c >= numberOfJobs
    for c := 0; c < count; c++ {
        <-done
    }

    fmt.Println("finished")

    // cleaning workers
    close(killsignal)
    time.Sleep(2 * time.Second)

    fmt.Printf("
%2fs", time.Since(start).Seconds())
}

My issue here is that it opens a lot of goroutines, use all the memory and crash.

What would be the best way to limit it?

  • 写回答

3条回答 默认 最新

  • duanlun2827 2019-05-27 14:51
    关注

    I striped out the progress bar as i did not want to bother about it, but overall this is closer to what you are looking for.

    It does not genuinely handle errors, they simply fail in a fatal state.

    I have added context and cancellation support.

    You might want to check for https://godoc.org/golang.org/x/sync/errgroup#Group.Go

    As a general recommentation, you need to learn the golang patterns and their usage.

    It is obvious you have not worked that enough, or that you are in process of learning.

    Its not the fastest program at all, but it does the job.

    This is only a draft to get you back on a better direction.

    package main
    
    import (
        "context"
        "encoding/csv"
        "flag"
        "fmt"
        "io"
        "log"
        "os"
        "os/signal"
        "sync"
        "time"
    )
    
    func worker(ctx context.Context, dst chan string, src chan []string) {
        for {
            select {
            case url, ok := <-src: // you must check for readable state of the channel.
                if !ok {
                    return
                }
                dst <- fmt.Sprintf("out of %v", url) // do somethingg useful.
            case <-ctx.Done(): // if the context is cancelled, quit.
                return
            }
        }
    }
    
    func main() {
    
        // create a context
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
        // that cancels at ctrl+C
        go onSignal(os.Interrupt, cancel)
    
        // parse command line arguments
        var filename string
        var numberOfWorkers int
        flag.StringVar(&filename, "filename", "", "src file")
        flag.IntVar(&numberOfWorkers, "c", 2, "concurrent workers")
        flag.Parse()
    
        // check arguments
        if filename == "" {
            log.Fatal("filename required")
        }
    
        start := time.Now()
    
        csvfile, err := os.Open(filename)
        if err != nil {
            log.Fatal(err)
        }
        defer csvfile.Close()
    
        reader := csv.NewReader(csvfile)
    
        // create the pair of input/output channels for the controller=>workers com.
        src := make(chan []string)
        out := make(chan string)
    
        // use a waitgroup to manage synchronization
        var wg sync.WaitGroup
    
        // declare the workers
        for i := 0; i < numberOfWorkers; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                worker(ctx, out, src)
            }()
        }
    
        // read the csv and write it to src
        go func() {
            for {
                record, err := reader.Read()
                if err == io.EOF {
                    break
                } else if err != nil {
                    log.Fatal(err)
                }
                src <- record // you might select on ctx.Done().
            }
            close(src) // close src to signal workers that no more job are incoming.
        }()
    
        // wait for worker group to finish and close out
        go func() {
            wg.Wait() // wait for writers to quit.
            close(out) // when you close(out) it breaks the below loop.
        }()
    
        // drain the output
        for res := range out {
            fmt.Println(res)
        }
    
        fmt.Printf("
    %2fs", time.Since(start).Seconds())
    }
    
    func onSignal(s os.Signal, h func()) {
        c := make(chan os.Signal, 1)
        signal.Notify(c, s)
        <-c
        h()
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀
  • ¥20 手写数字识别运行c仿真时,程序报错错误代码sim211-100
  • ¥15 关于#hadoop#的问题
  • ¥15 (标签-Python|关键词-socket)
  • ¥15 keil里为什么main.c定义的函数在it.c调用不了
  • ¥50 切换TabTip键盘的输入法
  • ¥15 可否在不同线程中调用封装数据库操作的类