Optimization is not a trivial task and may also involving database tuning, etc. Without detail knowledge about the system that you're trying to implement, it is difficult to give a proper advice. In addition to what already suggested in the answer, you may need to implement a kind of buffering, e.g. with channel which has fixed capacity. Then when buffer is FULL or timer EXPIRED, build the query then perform BULK INSERT in transaction. Try it at The Go Playground.
package main
import (
"fmt"
"time"
)
type DataBuffer struct {
capacity int
duration time.Duration
incomingData chan interface{}
full chan bool
mustExit chan bool
done chan bool
query string
args []interface{}
}
func NewDataBuffer(capacity int, dur time.Duration) *DataBuffer {
buf := &DataBuffer {
incomingData: make(chan interface{}, capacity),
full: make(chan bool),
args: make([]interface{}, capacity),
duration: dur,
mustExit: make(chan bool, 1),
done: make(chan bool, 1),
}
return buf
}
func (b *DataBuffer) Append(d interface{}) {
if !b.put(d) {
//Notify that buffer is full
//<- will wait until space available
b.full <- true
b.incomingData <- d
}
}
func (b *DataBuffer) put(d interface{}) bool {
//Try to append the data
//If channel is full, do nothing, then return false
select {
case b.incomingData <- d:
return true
default:
//channel is full
return false
}
}
func (b *DataBuffer) execTransaction() error {
/*
Begin transaction
Insert Data Group
Commit/rollback
*/
fmt.Print(time.Now())
fmt.Println(b.query)
fmt.Println(b.args)
return nil
}
func (b *DataBuffer) clear() {
//clear args
nOldArg := len(b.args)
for k := 0; k < nOldArg; k++ {
b.args[k] = nil
}
b.args = b.args[:0]
b.query = ""
}
func (b *DataBuffer) buildQuery() bool {
ndata := len(b.incomingData)
if ndata == 0 {
return false
}
k := 0
b.clear()
//Build the query, adjust as needed
b.query = "QUERY:"
for data := range b.incomingData {
b.query += fmt.Sprintf(" q%d", k) //build the query
b.args = append(b.args, data)
k++
if k >= ndata {
break
}
}
return true
}
func (b *DataBuffer) doInsert() {
if b.buildQuery() {
b.execTransaction()
}
}
func (b *DataBuffer) runAsync() {
defer func() {
b.doInsert()
fmt.Println("Last insert")
close(b.done)
}()
timer := time.NewTimer(b.duration)
for {
select {
case <- timer.C:
b.doInsert()
fmt.Println("Timer Expired")
timer.Reset(b.duration)
case <- b.full:
if !timer.Stop() {
<-timer.C
}
b.doInsert()
fmt.Println("Full")
timer.Reset(b.duration)
case <- b.mustExit:
if !timer.Stop() {
<-timer.C
}
return
}
}
}
func (b *DataBuffer) Run() {
go b.runAsync()
}
func (b *DataBuffer) Stop() {
b.mustExit <- true
}
func (b *DataBuffer) WaitDone() {
<- b.done
}
func main() {
buf := NewDataBuffer(5, 1*time.Second)
buf.Run()
//simulate incoming data
for k := 0; k < 30; k++ {
buf.Append(k)
time.Sleep(time.Duration(10*k)*time.Millisecond)
}
buf.Stop()
buf.WaitDone()
}
Note:
- You need to implement proper error handling.
- The type of
incomingData may be adjusted to your need