dongzhidian3538 2016-06-09 20:06
浏览 27
已采纳

Golang非阻塞缓冲区

Synchronous Example:

type job struct {
    Id int
    Message string
}

for {
    // getJob() blocks until job is received
    job := getJob()
    doSomethingWithJob(job)
}

I wish to process jobs as they come in from getJob with doSomethingWithJob. e.g. getJob could be a payload received from a MessagingQueue such as RabbitMQ/Beanstalkd or handling a HTTP Request.

I don't want to block getJob whilst I am doSomethingWithJob & vice versa. I do however want to control / buffer the number of jobs so that I don't overload the system. e.g. max concurrency of 5.

The concept of go routines confuse me at the moment, so any pointers in the right direction would be much appreciated to assist me learn.

Update: Thanks @JimB for your help. Why is worker 5 always picking up the job?

jobCh := make(chan *job)

// Max 5 Workers
for i := 0; i < 5; i++ {

    go func() {

        for job := range jobCh {
            time.Sleep(time.Second * time.Duration(rand.Intn(3)))
            log.Println(i, string(job.Message))
        }
    }()
}

for {
    job, err := getJob()
    if err != nil {
        log.Println("Closing Channel")
        close(jobCh)
        break
    }

    jobCh <- job
}

log.Println("Complete")

Example output

2016/06/09 22:19:57 5 {"id":10692,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10687,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10699,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10701,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10703,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10704,"name":"Test Message"}
  • 写回答

1条回答 默认 最新

  • dtnpzghys01643322 2016-06-09 20:13
    关注

    You can start 5 goroutines reading from a channel to call doSomethingWithJob. That way there's never more than 5 jobs being processed concurrently.

    jobCh := make(chan *job)
    
    // start 5 workers to process jobs
    for i := 0; i < 5; i++ {
        go func() {
            for job := range jobCh {
                doSomethingWithJob(job)
            }
        }()
    }
    
    // send jobs to workers as fast as we can
    for {
        jobCh <- getJob()
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 急matlab编程仿真二阶震荡系统
  • ¥20 TEC-9的数据通路实验
  • ¥15 ue5 .3之前好好的现在只要是激活关卡就会崩溃
  • ¥50 MATLAB实现圆柱体容器内球形颗粒堆积
  • ¥15 python如何将动态的多个子列表,拼接后进行集合的交集
  • ¥20 vitis-ai量化基于pytorch框架下的yolov5模型
  • ¥15 如何实现H5在QQ平台上的二次分享卡片效果?
  • ¥15 python爬取bilibili校园招聘网站
  • ¥30 求解达问题(有红包)
  • ¥15 请解包一个pak文件