I'm reading the pipelines tutorial online and trying to construct a stage that operates like this --
- Batches up incoming events in batches of 10 each before sending them to the out chan
- If we haven't seen 10 events in 5 seconds, combine as many as we received and send them, closing the out chan and returning.
However, I have no idea what would the first select case would look like.Tried multiple things but couldn't get past this. Any pointers much appreciated!
func BatchEvents(inChan <- chan *Event) <- chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for event := range inChan {
select {
case -WHAT GOES HERE?-:
if i < batchSize {
comboEvent.data = append(comboEvent.data, event.data)
i++;
} else {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i=0;
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
out <- &comboEvent
// stop after
return
}
}
}()
return out
}