douri4459 2017-08-24 23:41
浏览 73

如何使用渠道在golang管道阶段批处理项目?

I'm reading the pipelines tutorial online and trying to construct a stage that operates like this --

  1. Batches up incoming events in batches of 10 each before sending them to the out chan
  2. If we haven't seen 10 events in 5 seconds, combine as many as we received and send them, closing the out chan and returning.

However, I have no idea what would the first select case would look like.Tried multiple things but couldn't get past this. Any pointers much appreciated!

func BatchEvents(inChan <- chan *Event) <- chan *Event {
    batchSize := 10
    comboEvent := Event{}
    go func() {
        defer close(out)
        i = 0
        for event := range inChan {
            select {
            case -WHAT GOES HERE?-:
                if i < batchSize {
                    comboEvent.data = append(comboEvent.data, event.data)
                    i++;
                } else {
                    out <- &comboEvent
                    // reset for next batch
                    comboEvent = Event{}
                    i=0;
                }
            case <-time.After(5 * time.Second):
                // process whatever we have seen so far if the batch size isn't filled in 5 secs
                out <- &comboEvent
                // stop after
                return
            }
        }
    }()
    return out
}
  • 写回答

1条回答 默认 最新

  • dsd30433 2017-08-25 00:08
    关注

    Instead of doing a range over the channel, your first select case should be from that channel, with the whole thing inside an infinite loop.

    func BatchEvents(inChan <-chan *Event) <-chan *Event {
        batchSize := 10
        comboEvent := Event{}
        go func() {
            defer close(out)
            i = 0
            for {
                select {
                case event, ok := <-inChan:
                    if !ok {
                        return
                    }
                    comboEvent.data = append(comboEvent.data, event.data)
                    i++
                    if i == batchSize {
                        out <- &comboEvent
                        // reset for next batch
                        comboEvent = Event{}
                        i = 0
                    }
                case <-time.After(5 * time.Second):
                    // process whatever we have seen so far if the batch size isn't filled in 5 secs
                    if i > 0 {
                        out <- &comboEvent
                    }
                    // stop after
                    return
                }
            }
        }()
        return out
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥20 完全没有学习过GAN,看了CSDN的一篇文章,里面有代码但是完全不知道如何操作
  • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
  • ¥20 软件测试决策法疑问求解答
  • ¥15 win11 23H2删除推荐的项目,支持注册表等
  • ¥15 matlab 用yalmip搭建模型,cplex求解,线性化处理的方法
  • ¥15 qt6.6.3 基于百度云的语音识别 不会改
  • ¥15 关于#目标检测#的问题:大概就是类似后台自动检测某下架商品的库存,在他监测到该商品上架并且可以购买的瞬间点击立即购买下单
  • ¥15 神经网络怎么把隐含层变量融合到损失函数中?
  • ¥15 lingo18勾选global solver求解使用的算法
  • ¥15 全部备份安卓app数据包括密码,可以复制到另一手机上运行