douzhu6149 2018-01-23 12:42
浏览 33
已采纳

使用Golang及时提交数据库

I successfully "batched" many statements in lots of 500-1000 rows to be Inserted at once. However this was using simple for loop and manually setting it to 500-1000 loops. Something like:

for i:=0;i<500;i++ {
   // Create a string of 500 values to be inserted at once
}
// Insert the 500 values

Is there a way I could timely commit() like: "commit each seconds" ?

Conceptually, I'd like to have something like;

// Create connection to DB
// Begin a transaction
// Prepare a statement

go timelyCommits(tx)  // spawn a commit ticker
for {
   // Constantly create string of values to be inserted like:
   // Values (1, "one"),(2,"two"),(3,"three")...(1000,"thousand")...
   // Insert without commit
}

func timelyCommits(tx){
   for {
      time.Sleep(1 * time.Second)
      tx.Commit()
   }
}
  • 写回答

2条回答 默认 最新

  • duangan9251 2018-01-24 06:52
    关注

    Optimization is not a trivial task and may also involving database tuning, etc. Without detail knowledge about the system that you're trying to implement, it is difficult to give a proper advice. In addition to what already suggested in the answer, you may need to implement a kind of buffering, e.g. with channel which has fixed capacity. Then when buffer is FULL or timer EXPIRED, build the query then perform BULK INSERT in transaction. Try it at The Go Playground.

    package main
    
    import (
        "fmt"
        "time"
    )
    
    type DataBuffer struct {
        capacity int
        duration time.Duration
    
        incomingData chan interface{}
        full chan bool
        mustExit chan bool
        done chan bool
    
        query string
        args []interface{}
    }
    
    func NewDataBuffer(capacity int, dur time.Duration) *DataBuffer {
        buf := &DataBuffer {
            incomingData: make(chan interface{}, capacity),
            full: make(chan bool),
            args: make([]interface{}, capacity),
            duration: dur,
            mustExit: make(chan bool, 1),
            done: make(chan bool, 1),
        }
        return buf
    }
    
    func (b *DataBuffer) Append(d interface{}) {
        if !b.put(d) {
            //Notify that buffer is full
            //<- will wait until space available
            b.full <- true
            b.incomingData <- d
        }
    }
    
    func (b *DataBuffer) put(d interface{}) bool {
        //Try to append the data
        //If channel is full, do nothing, then return false
        select {
        case b.incomingData <- d:
            return true
        default:
            //channel is full
            return false
        }
    }
    
    func (b *DataBuffer) execTransaction() error {
        /*
            Begin transaction
            Insert Data Group 
            Commit/rollback
        */
    
        fmt.Print(time.Now())
        fmt.Println(b.query)
        fmt.Println(b.args)
    
        return nil
    }
    
    func (b *DataBuffer) clear() {
        //clear args
        nOldArg := len(b.args)
        for k := 0; k < nOldArg; k++ {
            b.args[k] = nil
        }
        b.args = b.args[:0]
        b.query = ""
    }
    
    func (b *DataBuffer) buildQuery() bool {
        ndata := len(b.incomingData)
        if ndata == 0 {
            return false
        }
    
        k := 0
        b.clear()
    
        //Build the query, adjust as needed
        b.query = "QUERY:"
        for data := range b.incomingData {
            b.query += fmt.Sprintf(" q%d", k) //build the query
            b.args = append(b.args, data)
    
            k++
            if k >= ndata {
                break
            }
    
        }
        return true
    }
    
    func (b *DataBuffer) doInsert() {
        if b.buildQuery() {
            b.execTransaction()
        }
    }
    
    func (b *DataBuffer) runAsync() {
        defer func() {
            b.doInsert()
            fmt.Println("Last insert")
            close(b.done)
        }()
    
        timer := time.NewTimer(b.duration)
        for {
            select {
            case <- timer.C:
                b.doInsert()
                fmt.Println("Timer Expired")
                timer.Reset(b.duration)
            case <- b.full:
                if !timer.Stop() {
                    <-timer.C
                }
                b.doInsert()
                fmt.Println("Full")
                timer.Reset(b.duration)
            case <- b.mustExit:
                if !timer.Stop() {
                    <-timer.C
                }
                return  
            }
        }
    }
    
    func (b *DataBuffer) Run() {
        go b.runAsync()
    }
    func (b *DataBuffer) Stop() {
        b.mustExit <- true
    }
    
    func (b *DataBuffer) WaitDone() {
        <- b.done
    }
    
    func main() {
        buf := NewDataBuffer(5, 1*time.Second)
        buf.Run()
    
        //simulate incoming data
        for k := 0; k < 30; k++ {
            buf.Append(k)
            time.Sleep(time.Duration(10*k)*time.Millisecond)
        }
        buf.Stop()
        buf.WaitDone()  
    }
    

    Note:

    • You need to implement proper error handling.
    • The type of incomingData may be adjusted to your need
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?