I have a go utility that reads documents from flat files and bulk loads them into couchbase. The application is able to insert data at a speed of up to 21K writes per second when 2 writer threads are being executed on a single machine (Destination is a remote couchbase cluster having one server within the network).
But When 2 writer threads are executed from 2 different machines (1 thread each), the insertion speed is reduced to half (10K writes/sec). Since both the machines are using their own RAM and CPU for insertion, plus the utility has shown the speed of up to 25K writes per second, the network doesn’t seem to be the issue (I checked the network utilization as well and it is below 50 percent when multiple machines are used.)
Note: All machines that are used have i7 3.40GHz quad-core processor and 8GB RAM. The total amount of data being inserted is up to 500MB. Bucket Configuration: 5.00GB RAM, Bucket disk I/O priority: High
I need to know what’s causing this speed gap. Please help…
Here is the Code:
package main
import (
"bufio"
"encoding/csv"
"fmt"
"io"
"log"
"strconv"
"os"
"sync"
"runtime"
"gopkg.in/couchbase/gocb.v1"
)
var (
bucket *gocb.Bucket
CB_Host string
)
func main() {
var wg sync.WaitGroup
CB_Host = <IP Address of Couchbase Server>
runtime.GOMAXPROCS(runtime.NumCPU())
cluster, err := gocb.Connect("couchbase://" + CB_Host) //..........Establish Couchbase Connection
if err != nil {
fmt.Println("ERROR CONNECTING COUCHBASE:", err)
}
bucket, err = cluster.OpenBucket("BUCKET", "*******")
if err != nil {
fmt.Println("ERROR OPENING BUCKET:", err)
}
Path := "E:\\Data\\File_" \\Path of the text file that contains data
for i := 1; i <= 2; i++{
wg.Add(1)
go InsertDataFromFile(Path+strconv.Itoa(i)+".txt", i, &wg)
}
wg.Wait()
err = bucket.Close() //.............. Close Couchbase Connection
if err != nil {
fmt.Println("ERROR CLOSING COUCHBASE CONNECTION:", err)
}
}
/*-- Main function Ends Here --*/
func InsertDataFromFile(Path string, i int, wg *sync.WaitGroup) (){
var (
ID string
JSONData string
items []gocb.BulkOp
)
csvFile, _ := os.Open(FilePath) //...............Open flat file containing data
reader := csv.NewReader(bufio.NewReader(csvFile))
reader.Comma = '$'
reader.LazyQuotes = true
counter := 1
fmt.Println("Starting Insertion of File "+ strconv.Itoa(i) + "...")
for {
line, error := reader.Read()
if error == io.EOF {
break
} else if error != nil { //...............Parse data and append it into items[] array
log.Fatal(error)
}
ID = line[0]
JSONData = line[1]
items = append(items, &gocb.UpsertOp{Key: ID, Value: JSONData})
if counter % 500 == 0 {
BulkInsert(&items) //................Bulk Insert Next 500 Documents Data into couchbase
items = nil
}
counter = counter + 1
}
BulkInsert(&items) //................Insert remaining documents
items = nil
fmt.Println("Insertion of File "+ strconv.Itoa(i) + " Completed...")
wg.Done()
}
func BulkInsert(item *[]gocb.BulkOp) (){
err := bucket.Do(*item)
if err != nil {
fmt.Println("ERROR PERFORMING BULK INSERT:", err)
}
}