dpquu9206 2018-11-02 07:29 采纳率: 0%
浏览 51
已采纳

使用Goroutines同时加载大型CSV时出现未定义的行为

I am trying to load a big CSV file using goroutines using Golang. The dimension of the csv is (254882, 100). But using my goroutines when I am parsing the csv and storing it into an 2D list, I am getting rows lesser than 254882 and the number is varying for each run. I feel it is happening due goroutines but can't seem to point the reason. Can anyone please help me. I am also new in Golang. Here is my code below

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    r := csv.NewReader(bufio.NewReader(f))
    counter := 0
    var wg sync.WaitGroup
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if counter != 0 {
            wg.Add(1)
            go func(r []string, dataset *[][]float64) {
                var temp []float64
                for _, each := range record {
                    f, err := strconv.ParseFloat(each, 64)
                    if err == nil {
                        temp = append(temp, f)
                    }
                }
                *dataset = append(*dataset, temp)
                wg.Done()
            }(record, &dataset)
        }
        counter++
    }
    wg.Wait()
    duration := time.Now().Sub(startTime)
    log.Printf("Loaded %d rows in %v seconds", counter, duration)
    return &dataset, nil
}

And my main function looks like the following

func main() {
    // runtime.GOMAXPROCS(4)
    dataset, err := loadCSV("AvgW2V_train.csv")
    if err != nil {
        panic(err)
    }
    fmt.Println(len(*dataset))
}

If anyone needs to download the CSV too, then click the link below (485 MB) https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing

  • 写回答

2条回答 默认 最新

  • douguo7431 2018-11-02 08:19
    关注

    There is no need to use *[][]float64 as that would be a double pointer.

    I have made some minor modifications to your program.

    dataset is available to new goroutine, since it's declared in it's above block of code.

    Similarly record is also available, but since record variable, is changing from time to time, we need to pass it to new goroutine.

    While there is no need to pass dataset, as it is not changing and that is what we want, so that we can append temp to dataset.

    But race condition happens when multiple goroutines are trying to append to same variable, i.e., multiple goroutines are trying to write to same variable.

    So we need to make sure that only one can goroutine can add at any instance of time. So we use a lock to make appending sequential.

    package main
    
    import (
        "bufio"
        "encoding/csv"
        "fmt"
        "os"
        "strconv"
        "sync"
    )
    
    func loadCSV(csvFile string) [][]float64 {
        var dataset [][]float64
    
        f, _ := os.Open(csvFile)
    
        r := csv.NewReader(f)
    
        var wg sync.WaitGroup
        l := new(sync.Mutex) // lock
    
        for record, err := r.Read(); err == nil; record, err = r.Read() {
            wg.Add(1)
    
            go func(record []string) {
                defer wg.Done()
    
                var temp []float64
                for _, each := range record {
                    if f, err := strconv.ParseFloat(each, 64); err == nil {
                        temp = append(temp, f)
                    }
                }
                l.Lock() // lock before writing
                dataset = append(dataset, temp) // write
                l.Unlock() // unlock
    
            }(record)
        }
    
        wg.Wait()
    
        return dataset
    }
    
    func main() {
        dataset := loadCSV("train.csv")
        fmt.Println(len(dataset))
    }
    

    Some errors were not handled to make it minimal, but you should handle errors.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 高价求中通快递查询接口
  • ¥15 解决一个加好友限制问题 或者有好的方案
  • ¥15 关于#java#的问题,请各位专家解答!
  • ¥15 急matlab编程仿真二阶震荡系统
  • ¥20 TEC-9的数据通路实验
  • ¥15 ue5 .3之前好好的现在只要是激活关卡就会崩溃
  • ¥50 MATLAB实现圆柱体容器内球形颗粒堆积
  • ¥15 python如何将动态的多个子列表,拼接后进行集合的交集
  • ¥20 vitis-ai量化基于pytorch框架下的yolov5模型
  • ¥15 如何实现H5在QQ平台上的二次分享卡片效果?