duangan9251 2019-07-29 07:19
浏览 356

MongoDB BulkWrite内存成本

I'm using the official mongodb driver for Go. I take a CSV and read it line by line till I reach 1000 lines, I then parse the data and insert it into the db. I assumed it'd take a constant memory as the data that goes to bulk write is always the same (1000 contacts). However, that is not the case as the memory increases significantly. Here's some data regarding the above query:

batchSize = 1000

Contacts - Memory consumed by bulkwrite
10k - 14 MB
20K - 30MB
30K - 59MB
40K - 137 MB
50K -241 MB

Can anyone explain why?

Here's the code:

func (c *csvProcessor) processCSV(r io.Reader, headerMap map[string]int, emailDB *mongo.Database) error {
    //some code...
    csvReader := csv.NewReader(r)
    for {
        eofReached, err := c.processCSVBatch(csvReader, emailHash, smsHash, headerMap, emailDB)
        if err != nil {
            return errors.Wrap(err, "process CSV batch")
        }
        if eofReached {
            break
        }
    }
    return nil
}

func (c *csvProcessor) processCSVBatch(csvReader *csv.Reader, emailHash map[string]*userData, smsHash map[string]*userData, headerMap map[string]int, emailDB *mongo.Database) (bool, error) {
    var insertUsers, updateUsers, deleteUsers []interface{}
    var isEOFReached bool
    for i := 0; i < processCSVBatchSize; i++ {
        line, err := csvReader.Read()
        if err != nil {
            if err != io.EOF {
                return false, errors.Wrap(err, "read from input")
            }
            isEOFReached = true
            break
        }
        //some code
        insert, update, delete := c.dataMerger.mergeData(
            c.parseUser(line, headerMap),
            emailHash[stringToMD5(line[headerMap["email"]])],
            smsHashVal,
        )
        if insert != nil {
            insertUsers = append(insertUsers, insert)
        }
        if update != nil {
            updateUsers = append(updateUsers, update)
        }
        if delete != nil {
            deleteUsers = append(deleteUsers, delete)
        }
    }
    //update DB
    err := c.mongoDBUserHandler.saveUsers(emailDB, insertUsers, updateUsers, deleteUsers)
    if err != nil {
        return false, errors.Wrap(err, "save users")
    }
    return isEOFReached, nil
}

func (m *mongoDBUserHandler) saveUsers(emailDB *mongo.Database, insert, update, delete []interface{}) error {
    ctx := context.Background()
    // create the slice of write models
    var writes []mongo.WriteModel
    if len(insert) > 0 {
        writes = append(writes, m.getInsertWrites(insert)...)
    }
    if len(update) > 0 {
        writes = append(writes, m.getUpdateWrites(update)...)
    }
    if len(delete) > 0 {
        writes = append(writes, m.getDeleteWrites(delete)...)
    }
    if len(writes) == 0 {
        return nil
    }
    // run bulk write
    _, err := emailDB.
        Collection(userCollection).
        BulkWrite(ctx, writes, options.BulkWrite().SetOrdered(false))
    if err != nil {
        return errors.Wrap(err, "bulk write")
    }
    return nil
}
  • 写回答

1条回答 默认 最新

  • drh47606 2019-07-29 14:14
    关注

    below are copy in disguise:

        if len(insert) > 0 {
            writes = append(writes, m.getInsertWrites(insert)...)
        }
        if len(update) > 0 {
            writes = append(writes, m.getUpdateWrites(update)...)
        }
        if len(delete) > 0 {
            writes = append(writes, m.getDeleteWrites(delete)...)
        }
    

    Very low hanging fruit: remove those lines above, change BulkWrite to accept writes as interface{} so that you can reuse the same backing array, that should save you some memory.

    评论

报告相同问题?

悬赏问题

  • ¥15 mmocr的训练错误,结果全为0
  • ¥15 python的qt5界面
  • ¥15 无线电能传输系统MATLAB仿真问题
  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀