I scripted up a simple little example that inserts 10million records into a mongodb. I started out by making it work sequentially. Then I looked up how to do concurrency, and found goroutines. This seems like what I want, but it's not behaving as I would expect. I implemented a WaitGroup to block the program from exiting before all the goroutines were finished, but I'm still having a problem.
So I'll start with what's happening then show the code. When I run the code without the goroutine all 10million records insert in mongodb fine. However, when I add the goroutine some indeterminate amount get entered.. generally around 8500 give or take a couple hundred. I checked the mongodb log to see if it was having problems and nothing is showing up. So I'm not sure it's that, could be, just not being logged. Anyway, here's the code:
(Side note: I'm doing 1 record at a time but I've split it out to a method so I can test out multiple records at a time in the future.. just haven't figured out how to do it with mongodb yet.)
package main
import (
"fmt"
"labix.org/v2/mgo"
"strconv"
"time"
"sync"
)
// structs
type Reading struct {
Id string
Name string
}
var waitGroup sync.WaitGroup
// methods
func main() {
// Setup timer
startTime := time.Now()
// Setup collection
collection := getCollection("test", "readings")
fmt.Println("collection complete: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Setup readings
readings := prepareReadings()
fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// Insert readings
for i := 1; i <= 1000000; i++ {
waitGroup.Add(1)
go insertReadings(collection, readings)
// fmt.Print(".")
if i % 1000 == 0 {
fmt.Println("1000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
}
waitGroup.Wait()
fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
func getCollection(databaseName string, tableName string) *mgo.Collection {
session, err := mgo.Dial("localhost")
if err != nil {
// panic(err)
fmt.Println("error getCollection:", err)
}
// defer session.Close()
// Optional. Switch the session to a monotonic behavior.
// session.SetMode(mgo.Monotonic, true)
collection := session.DB(databaseName).C(tableName)
return collection
}
func insertReadings(collection *mgo.Collection, readings []Reading) {
err := collection.Insert(readings)
if err != nil {
// panic(err)
fmt.Println("error insertReadings:", err)
}
waitGroup.Done()
}
func prepareReadings() []Reading {
var readings []Reading
for i := 1; i <= 1; i++ {
readings = append(readings, Reading{Name: "Thing"})
}
return readings
}