douren5898 2018-12-20 10:28 采纳率: 100%
浏览 38
已采纳

从输入通道正确批处理项目

Use case

I want to persist a lot data in a MySQL database which I receive via a channel. For performance reasons I process them in batches of 10 items. I receive the input items only every 3 hours.

The problem

Assuming I get 10004 items, there will be 4 items left because my go routine waits for 10 items before it "flushes them away" in a batch. I want to make sure that it creates a batch with less than 10 items in case there are no more items in that channel (channel is also closed by the producer then).

Code:

// ProcessAudits sends the given audits in batches to SQL
func ProcessAudits(done <-chan bq.Audit) {
    var audits []bq.Audit
    for auditRow := range done {
        user := auditRow.UserID.StringVal
        log.Infof("Received audit %s", user)
        audits = append(audits, auditRow)

        if len(audits) == 10 {
            upsertBigQueryAudits(audits)
            audits = []bigquery.Audit{}
        }
    }
}

I am new to Go and I am not sure how I would properly implement that?

  • 写回答

2条回答 默认 最新

  • duanpi7578 2018-12-20 10:48
    关注

    Here's a working example. When a channel is closed, the range exits, so you can just process any remaining items after the loop.

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    type Audit struct {
        ID int
    }
    
    func upsertBigQueryAudits(audits []Audit) {
        fmt.Printf("Processing batch of %d
    ", len(audits))
        for _, a := range audits {
            fmt.Printf("%d ", a.ID)
        }
        fmt.Println()
    }
    
    func processAudits(audits <-chan Audit, batchSize int) {
        var batch []Audit
        for audit := range audits {
            batch = append(batch, audit)
            if len(batch) == batchSize {
                upsertBigQueryAudits(batch)
                batch = []Audit{}
            }
        }
        if len(batch) > 0 {
            upsertBigQueryAudits(batch)
        }
    }
    
    func produceAudits(x int, to chan Audit) {
        for i := 0; i < x; i++ {
            to <- Audit{
                ID: i,
            }
        }
    }
    
    const batchSize = 10
    
    func main() {
        var wg sync.WaitGroup
        audits := make(chan Audit)
        wg.Add(1)
        go func() {
            defer wg.Done()
            processAudits(audits, batchSize)
        }()
        wg.Add(1)
        go func() {
            defer wg.Done()
            produceAudits(25, audits)
            close(audits)
        }()
        wg.Wait()
        fmt.Println("Complete")
    }
    

    Output:

    Processing batch of 10
    0 1 2 3 4 5 6 7 8 9
    Processing batch of 10
    10 11 12 13 14 15 16 17 18 19
    Processing batch of 5
    20 21 22 23 24
    Complete
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥15 c++头文件不能识别CDialog