douzhu6149 2018-01-23 12:42
浏览 33
已采纳

使用Golang及时提交数据库

I successfully "batched" many statements in lots of 500-1000 rows to be Inserted at once. However this was using simple for loop and manually setting it to 500-1000 loops. Something like:

for i:=0;i<500;i++ {
   // Create a string of 500 values to be inserted at once
}
// Insert the 500 values

Is there a way I could timely commit() like: "commit each seconds" ?

Conceptually, I'd like to have something like;

// Create connection to DB
// Begin a transaction
// Prepare a statement

go timelyCommits(tx)  // spawn a commit ticker
for {
   // Constantly create string of values to be inserted like:
   // Values (1, "one"),(2,"two"),(3,"three")...(1000,"thousand")...
   // Insert without commit
}

func timelyCommits(tx){
   for {
      time.Sleep(1 * time.Second)
      tx.Commit()
   }
}
  • 写回答

2条回答 默认 最新

  • duangan9251 2018-01-24 06:52
    关注

    Optimization is not a trivial task and may also involving database tuning, etc. Without detail knowledge about the system that you're trying to implement, it is difficult to give a proper advice. In addition to what already suggested in the answer, you may need to implement a kind of buffering, e.g. with channel which has fixed capacity. Then when buffer is FULL or timer EXPIRED, build the query then perform BULK INSERT in transaction. Try it at The Go Playground.

    package main
    
    import (
        "fmt"
        "time"
    )
    
    type DataBuffer struct {
        capacity int
        duration time.Duration
    
        incomingData chan interface{}
        full chan bool
        mustExit chan bool
        done chan bool
    
        query string
        args []interface{}
    }
    
    func NewDataBuffer(capacity int, dur time.Duration) *DataBuffer {
        buf := &DataBuffer {
            incomingData: make(chan interface{}, capacity),
            full: make(chan bool),
            args: make([]interface{}, capacity),
            duration: dur,
            mustExit: make(chan bool, 1),
            done: make(chan bool, 1),
        }
        return buf
    }
    
    func (b *DataBuffer) Append(d interface{}) {
        if !b.put(d) {
            //Notify that buffer is full
            //<- will wait until space available
            b.full <- true
            b.incomingData <- d
        }
    }
    
    func (b *DataBuffer) put(d interface{}) bool {
        //Try to append the data
        //If channel is full, do nothing, then return false
        select {
        case b.incomingData <- d:
            return true
        default:
            //channel is full
            return false
        }
    }
    
    func (b *DataBuffer) execTransaction() error {
        /*
            Begin transaction
            Insert Data Group 
            Commit/rollback
        */
    
        fmt.Print(time.Now())
        fmt.Println(b.query)
        fmt.Println(b.args)
    
        return nil
    }
    
    func (b *DataBuffer) clear() {
        //clear args
        nOldArg := len(b.args)
        for k := 0; k < nOldArg; k++ {
            b.args[k] = nil
        }
        b.args = b.args[:0]
        b.query = ""
    }
    
    func (b *DataBuffer) buildQuery() bool {
        ndata := len(b.incomingData)
        if ndata == 0 {
            return false
        }
    
        k := 0
        b.clear()
    
        //Build the query, adjust as needed
        b.query = "QUERY:"
        for data := range b.incomingData {
            b.query += fmt.Sprintf(" q%d", k) //build the query
            b.args = append(b.args, data)
    
            k++
            if k >= ndata {
                break
            }
    
        }
        return true
    }
    
    func (b *DataBuffer) doInsert() {
        if b.buildQuery() {
            b.execTransaction()
        }
    }
    
    func (b *DataBuffer) runAsync() {
        defer func() {
            b.doInsert()
            fmt.Println("Last insert")
            close(b.done)
        }()
    
        timer := time.NewTimer(b.duration)
        for {
            select {
            case <- timer.C:
                b.doInsert()
                fmt.Println("Timer Expired")
                timer.Reset(b.duration)
            case <- b.full:
                if !timer.Stop() {
                    <-timer.C
                }
                b.doInsert()
                fmt.Println("Full")
                timer.Reset(b.duration)
            case <- b.mustExit:
                if !timer.Stop() {
                    <-timer.C
                }
                return  
            }
        }
    }
    
    func (b *DataBuffer) Run() {
        go b.runAsync()
    }
    func (b *DataBuffer) Stop() {
        b.mustExit <- true
    }
    
    func (b *DataBuffer) WaitDone() {
        <- b.done
    }
    
    func main() {
        buf := NewDataBuffer(5, 1*time.Second)
        buf.Run()
    
        //simulate incoming data
        for k := 0; k < 30; k++ {
            buf.Append(k)
            time.Sleep(time.Duration(10*k)*time.Millisecond)
        }
        buf.Stop()
        buf.WaitDone()  
    }
    

    Note:

    • You need to implement proper error handling.
    • The type of incomingData may be adjusted to your need
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀
  • ¥20 手写数字识别运行c仿真时,程序报错错误代码sim211-100
  • ¥15 关于#hadoop#的问题
  • ¥15 (标签-Python|关键词-socket)