dpquu9206 2018-11-02 07:29 采纳率: 0%
浏览 51
已采纳

使用Goroutines同时加载大型CSV时出现未定义的行为

I am trying to load a big CSV file using goroutines using Golang. The dimension of the csv is (254882, 100). But using my goroutines when I am parsing the csv and storing it into an 2D list, I am getting rows lesser than 254882 and the number is varying for each run. I feel it is happening due goroutines but can't seem to point the reason. Can anyone please help me. I am also new in Golang. Here is my code below

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    r := csv.NewReader(bufio.NewReader(f))
    counter := 0
    var wg sync.WaitGroup
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if counter != 0 {
            wg.Add(1)
            go func(r []string, dataset *[][]float64) {
                var temp []float64
                for _, each := range record {
                    f, err := strconv.ParseFloat(each, 64)
                    if err == nil {
                        temp = append(temp, f)
                    }
                }
                *dataset = append(*dataset, temp)
                wg.Done()
            }(record, &dataset)
        }
        counter++
    }
    wg.Wait()
    duration := time.Now().Sub(startTime)
    log.Printf("Loaded %d rows in %v seconds", counter, duration)
    return &dataset, nil
}

And my main function looks like the following

func main() {
    // runtime.GOMAXPROCS(4)
    dataset, err := loadCSV("AvgW2V_train.csv")
    if err != nil {
        panic(err)
    }
    fmt.Println(len(*dataset))
}

If anyone needs to download the CSV too, then click the link below (485 MB) https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing

  • 写回答

2条回答 默认 最新

  • douguo7431 2018-11-02 08:19
    关注

    There is no need to use *[][]float64 as that would be a double pointer.

    I have made some minor modifications to your program.

    dataset is available to new goroutine, since it's declared in it's above block of code.

    Similarly record is also available, but since record variable, is changing from time to time, we need to pass it to new goroutine.

    While there is no need to pass dataset, as it is not changing and that is what we want, so that we can append temp to dataset.

    But race condition happens when multiple goroutines are trying to append to same variable, i.e., multiple goroutines are trying to write to same variable.

    So we need to make sure that only one can goroutine can add at any instance of time. So we use a lock to make appending sequential.

    package main
    
    import (
        "bufio"
        "encoding/csv"
        "fmt"
        "os"
        "strconv"
        "sync"
    )
    
    func loadCSV(csvFile string) [][]float64 {
        var dataset [][]float64
    
        f, _ := os.Open(csvFile)
    
        r := csv.NewReader(f)
    
        var wg sync.WaitGroup
        l := new(sync.Mutex) // lock
    
        for record, err := r.Read(); err == nil; record, err = r.Read() {
            wg.Add(1)
    
            go func(record []string) {
                defer wg.Done()
    
                var temp []float64
                for _, each := range record {
                    if f, err := strconv.ParseFloat(each, 64); err == nil {
                        temp = append(temp, f)
                    }
                }
                l.Lock() // lock before writing
                dataset = append(dataset, temp) // write
                l.Unlock() // unlock
    
            }(record)
        }
    
        wg.Wait()
    
        return dataset
    }
    
    func main() {
        dataset := loadCSV("train.csv")
        fmt.Println(len(dataset))
    }
    

    Some errors were not handled to make it minimal, but you should handle errors.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 linux驱动,linux应用,多线程
  • ¥20 我要一个分身加定位两个功能的安卓app
  • ¥15 基于FOC驱动器,如何实现卡丁车下坡无阻力的遛坡的效果
  • ¥15 IAR程序莫名变量多重定义
  • ¥15 (标签-UDP|关键词-client)
  • ¥15 关于库卡officelite无法与虚拟机通讯的问题
  • ¥15 目标检测项目无法读取视频
  • ¥15 GEO datasets中基因芯片数据仅仅提供了normalized signal如何进行差异分析
  • ¥100 求采集电商背景音乐的方法
  • ¥15 数学建模竞赛求指导帮助