dto52236 2014-10-07 02:17
浏览 52
已采纳

进入频道非阻塞多个接收

Everywhere seems to discuss that reading from a channel should always be a blocking operation. The attitude seems to be this is the Go way. This makes some sense but I'm trying to figure out how I would aggregate things from channels.

For example, sending http requests. Say I have a pipeline setup that generates streams of data, so I have a channel that produces queue/stream of points. I could then have a goroutine listen to this channel and send a HTTP Request to store it in a service. This works, but I'm creating a http request for every point.

The endpoint I'm sending it too allows me to send multiple data points in a batch. What I would like to do, is

  1. Read as many values until I would block on channel.
  2. Combine them/send single http request.
  3. Then block on channel until I can read one again.

This is how I would've done things in C, with threadsafe queues and select statements. Basically flushing the entire/queue buffer when possible. Is this a valid technique in go?

It seems the go select statement does give me something similar to C's select, but I'm still not sure if there is a 'nonblocking read' on channels.

EDIT: I'm also willing to accept what I'm intending may not be the Go Way, but constantly smashing off non stop http requests also seems wrong to me, especially if they can be aggregated. If someone has an alternative architecture that will be cool, but I want to avoid things like, magically buffering N items, or waiting X seconds until sending.

  • 写回答

2条回答 默认 最新

  • douran6443 2014-10-07 06:52
    关注

    Here's how to batch until the channel is empty. The variable batch is a slice of your data point type. The variable ch is a channel of your data point type.

    var batch []someType
    for {
        select {
        case v := <-ch:
           batch = append(batch, v)
        default:
           if len(batch) > 0 {
               sendBatch(batch)
               batch := batch[:0]
           }
           batch = append(batch, <-ch)  // receiving a value here prevents busy waiting.
        }
    }
    

    You should prevent the batch from growing without limit. Here's a simple way to do it:

    var batch []someType
    for {
        select {
        case v := <-ch:
           batch = append(batch, v)
           if len(batch) >= batchLimit {
               sendBatch(batch)
               batch := batch[:0]
           }
        default:
           if len(batch) > 0 {
               sendBatch(batch)
               batch := batch[:0]
           }
           batch = append(batch, <-ch)
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 R语言Rstudio突然无法启动
  • ¥15 关于#matlab#的问题:提取2个图像的变量作为另外一个图像像元的移动量,计算新的位置创建新的图像并提取第二个图像的变量到新的图像
  • ¥15 改算法,照着压缩包里边,参考其他代码封装的格式 写到main函数里
  • ¥15 用windows做服务的同志有吗
  • ¥60 求一个简单的网页(标签-安全|关键词-上传)
  • ¥35 lstm时间序列共享单车预测,loss值优化,参数优化算法
  • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
  • ¥100 为什么这个恒流源电路不能恒流?
  • ¥15 有偿求跨组件数据流路径图
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值