duancenxie2233 2018-11-12 14:43
浏览 86
已采纳

等待通道中的N个项目,然后顺序执行

So I am very new to go! But I had this idea about something I wanted to try.

I would like to have a go routine that accepts strings from a channel but only after it has received N strings should it execute on them.

I looked around for similar questions or cases but I only found ones where the idea was to execute several routines in parallel and wait to aggregate the result.

I though about the idea of creating an array and just pass it to a routine where the length was sufficient. However I want to keep a certain separation of concerns and control this on the receiving end.

My questions are.

  1. Is this bad practice for some reason?
  2. Is there a better way to do this, what is it?

    func main() {
        ch := make(chan string)
        go func() {
            tasks := []string{}
            for {
                tasks = append(tasks,<- ch)
    
                if len(tasks) < 3 {
                    fmt.Println("Queue still to small")
                }
                if len(tasks) > 3 {
                    for i := 0; i < len(tasks); i++ {
                        fmt.Println(tasks[i])
                    }
                }
            }
        }()
    
        ch <- "Msg 1"
        time.Sleep(time.Second)
        ch <- "Msg 2"
        time.Sleep(time.Second)
        ch <- "Msg 3"
        time.Sleep(time.Second)
        ch <- "Msg 4"
        time.Sleep(time.Second)
    }
    

Edit for simpler more accurate example.

  • 写回答

2条回答 默认 最新

  • douwei1930 2018-11-12 16:44
    关注

    Based on a few comments, it looks like what you are looking for is some form of batching.

    Batching has a few scenarios when you would want to take the batch and send it along:

    1. Batch size is sufficient size
    2. Enough time has passed and a partial batch should be flushed

    Your given example does not account for the second scenario. This can lead to some awkward behavior if you just never flush because you quit getting load.

    Therefore I would recommend either looking into a library (e.g., cloudfoundry/go-batching) or simply use channels, a Timer and a select statement.

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        ch := make(chan string)
        go func() {
            tasks := []string{}
            timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
            for {
                select {
                case <-timer.C:
                    fmt.Println("Flush partial batch due to time")
                    flush(tasks)
                    tasks = nil
                    timer.Reset(time.Second)
                case data := <-ch:
                    tasks = append(tasks, data)
    
                    // Reset the timer for each data point so that we only flush
                    // partial batches when we stop receiving data.
                    if !timer.Stop() {
                        <-timer.C
                    }
                    timer.Reset(time.Second)
    
                    // Guard clause to for batch size
                    if len(tasks) < 3 {
                        fmt.Println("Queue still too small")
                        continue
                    }
    
                    flush(tasks)
                    tasks = nil // reset tasks
                }
            }
        }()
    
        ch <- "Msg 1"
        time.Sleep(time.Second)
        ch <- "Msg 2"
        time.Sleep(time.Second)
        ch <- "Msg 3"
        time.Sleep(time.Second)
        ch <- "Msg 4"
        time.Sleep(time.Second)
    }
    
    func flush(tasks []string) {
        // Guard against emtpy flushes
        if len(tasks) == 0 {
            return
        }
    
        fmt.Println("Flush")
        for _, t := range tasks {
            fmt.Println(t)
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀
  • ¥20 手写数字识别运行c仿真时,程序报错错误代码sim211-100
  • ¥15 关于#hadoop#的问题
  • ¥15 (标签-Python|关键词-socket)