I'm using the official mongodb
driver for Go. I take a CSV and read it line by line till I reach 1000 lines, I then parse the data and insert it into the db. I assumed it'd take a constant memory as the data that goes to bulk write is always the same (1000 contacts). However, that is not the case as the memory increases significantly. Here's some data regarding the above query:
batchSize = 1000
Contacts - Memory consumed by bulkwrite
10k - 14 MB
20K - 30MB
30K - 59MB
40K - 137 MB
50K -241 MB
Can anyone explain why?
Here's the code:
func (c *csvProcessor) processCSV(r io.Reader, headerMap map[string]int, emailDB *mongo.Database) error {
//some code...
csvReader := csv.NewReader(r)
for {
eofReached, err := c.processCSVBatch(csvReader, emailHash, smsHash, headerMap, emailDB)
if err != nil {
return errors.Wrap(err, "process CSV batch")
}
if eofReached {
break
}
}
return nil
}
func (c *csvProcessor) processCSVBatch(csvReader *csv.Reader, emailHash map[string]*userData, smsHash map[string]*userData, headerMap map[string]int, emailDB *mongo.Database) (bool, error) {
var insertUsers, updateUsers, deleteUsers []interface{}
var isEOFReached bool
for i := 0; i < processCSVBatchSize; i++ {
line, err := csvReader.Read()
if err != nil {
if err != io.EOF {
return false, errors.Wrap(err, "read from input")
}
isEOFReached = true
break
}
//some code
insert, update, delete := c.dataMerger.mergeData(
c.parseUser(line, headerMap),
emailHash[stringToMD5(line[headerMap["email"]])],
smsHashVal,
)
if insert != nil {
insertUsers = append(insertUsers, insert)
}
if update != nil {
updateUsers = append(updateUsers, update)
}
if delete != nil {
deleteUsers = append(deleteUsers, delete)
}
}
//update DB
err := c.mongoDBUserHandler.saveUsers(emailDB, insertUsers, updateUsers, deleteUsers)
if err != nil {
return false, errors.Wrap(err, "save users")
}
return isEOFReached, nil
}
func (m *mongoDBUserHandler) saveUsers(emailDB *mongo.Database, insert, update, delete []interface{}) error {
ctx := context.Background()
// create the slice of write models
var writes []mongo.WriteModel
if len(insert) > 0 {
writes = append(writes, m.getInsertWrites(insert)...)
}
if len(update) > 0 {
writes = append(writes, m.getUpdateWrites(update)...)
}
if len(delete) > 0 {
writes = append(writes, m.getDeleteWrites(delete)...)
}
if len(writes) == 0 {
return nil
}
// run bulk write
_, err := emailDB.
Collection(userCollection).
BulkWrite(ctx, writes, options.BulkWrite().SetOrdered(false))
if err != nil {
return errors.Wrap(err, "bulk write")
}
return nil
}