I'm trying to find the best efficient way to read a csv file (~1M row). Each row contain a HTTP link to an image which I need to download.
This is my current code using worker pools:
func worker(queue chan []string, worknumber int, done, ks chan bool) {
for true {
select {
case url := <-queue:
fmt.Println("doing work!", url, "worknumber", worknumber)
processData(url) // HTTP download
done <- true
case <-ks:
fmt.Println("worker halted, number", worknumber)
return
}
}
}
func main() {
start := time.Now()
flag.Parse()
fmt.Print(strings.Join(flag.Args(), "
"))
if *filename == "REQUIRED" {
return
}
csvfile, err := os.Open(*filename)
if err != nil {
fmt.Println(err)
return
}
count, _ := lineCounter(csvfile)
fmt.Printf("Total count: %d
", count)
csvfile.Seek(0, 0)
defer csvfile.Close()
//bar := pb.StartNew(count)
bar := progressbar.NewOptions(count)
bar.RenderBlank()
reader := csv.NewReader(csvfile)
//channel for terminating the workers
killsignal := make(chan bool)
//queue of jobs
q := make(chan []string)
// done channel takes the result of the job
done := make(chan bool)
numberOfWorkers := *numChannels
for i := 0; i < numberOfWorkers; i++ {
go worker(q, i, done, killsignal)
}
i := 0
for {
record, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
fmt.Println(err)
return
}
i++
go func(r []string, i int) {
q <- r
bar.Add(1)
}(record, i)
}
// a deadlock occurs if c >= numberOfJobs
for c := 0; c < count; c++ {
<-done
}
fmt.Println("finished")
// cleaning workers
close(killsignal)
time.Sleep(2 * time.Second)
fmt.Printf("
%2fs", time.Since(start).Seconds())
}
My issue here is that it opens a lot of goroutines, use all the memory and crash.
What would be the best way to limit it?