dongzhidian3538 2016-06-09 20:06
浏览 27
已采纳

Golang非阻塞缓冲区

Synchronous Example:

type job struct {
    Id int
    Message string
}

for {
    // getJob() blocks until job is received
    job := getJob()
    doSomethingWithJob(job)
}

I wish to process jobs as they come in from getJob with doSomethingWithJob. e.g. getJob could be a payload received from a MessagingQueue such as RabbitMQ/Beanstalkd or handling a HTTP Request.

I don't want to block getJob whilst I am doSomethingWithJob & vice versa. I do however want to control / buffer the number of jobs so that I don't overload the system. e.g. max concurrency of 5.

The concept of go routines confuse me at the moment, so any pointers in the right direction would be much appreciated to assist me learn.

Update: Thanks @JimB for your help. Why is worker 5 always picking up the job?

jobCh := make(chan *job)

// Max 5 Workers
for i := 0; i < 5; i++ {

    go func() {

        for job := range jobCh {
            time.Sleep(time.Second * time.Duration(rand.Intn(3)))
            log.Println(i, string(job.Message))
        }
    }()
}

for {
    job, err := getJob()
    if err != nil {
        log.Println("Closing Channel")
        close(jobCh)
        break
    }

    jobCh <- job
}

log.Println("Complete")

Example output

2016/06/09 22:19:57 5 {"id":10692,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10687,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10699,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10701,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10703,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10704,"name":"Test Message"}
  • 写回答

1条回答 默认 最新

  • dtnpzghys01643322 2016-06-09 20:13
    关注

    You can start 5 goroutines reading from a channel to call doSomethingWithJob. That way there's never more than 5 jobs being processed concurrently.

    jobCh := make(chan *job)
    
    // start 5 workers to process jobs
    for i := 0; i < 5; i++ {
        go func() {
            for job := range jobCh {
                doSomethingWithJob(job)
            }
        }()
    }
    
    // send jobs to workers as fast as we can
    for {
        jobCh <- getJob()
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
  • ¥20 软件测试决策法疑问求解答
  • ¥15 win11 23H2删除推荐的项目,支持注册表等
  • ¥15 matlab 用yalmip搭建模型,cplex求解,线性化处理的方法
  • ¥15 qt6.6.3 基于百度云的语音识别 不会改
  • ¥15 关于#目标检测#的问题:大概就是类似后台自动检测某下架商品的库存,在他监测到该商品上架并且可以购买的瞬间点击立即购买下单
  • ¥15 神经网络怎么把隐含层变量融合到损失函数中?
  • ¥15 lingo18勾选global solver求解使用的算法
  • ¥15 全部备份安卓app数据包括密码,可以复制到另一手机上运行
  • ¥20 测距传感器数据手册i2c