dowdw44426 2017-02-23 05:44
浏览 82
已采纳

进行队列处理并重试失败

We have a bunch of files to be uploaded to remote blob store after processing.

Currently, the frontend (PHP) creates a redis list of such files and gives it a unique ID, called JobID. It then passes the unique ID to a beanstalk tube, which is received by a Go process. It uses a library called Go workers to process each job ID in the fashion of what net/http does. It receives the job ID, retrieves the redis list and starts processing files.

However, currently only one file is processed at a time. Since the operation here is I/O bound, not CPU bound, intuition suggests that it would be benefitial to use a goroutine per file.

However, we want to retry uploading on failure, as well as track the number of items processed per job. We cannot start a unbound number of goroutines because a single Job can contain about ~10k files to process and 100s of such Jobs can be sent per second during peak times. What would be the correct approach for this?

NB: We can change the technology stack a bit if needed (such as swapping out beanstalkd for something)

  • 写回答

1条回答 默认 最新

  • douran9707 2017-02-23 06:16
    关注

    You can limit the number of goroutines by using a buffered chan with a size of the maximum number of goroutines you want. You can then block on this chan if it reaches maximum capacity. As your goroutines finish, they will free up slots to allow new goroutines to run.

    Example:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    var (
        concurrent    = 5
        semaphoreChan = make(chan struct{}, concurrent)
    )
    
    func doWork(wg *sync.WaitGroup, item int) {
        // block while full
        semaphoreChan <- struct{}{}
    
        go func() {
            defer func() {
                // read to release a slot
                <-semaphoreChan
                wg.Done()
            }()
            // This is where your work actually gets done
            fmt.Println(item)
        }()
    }
    
    func main() {
        // we need this for the example so that we can block until all goroutines finish
        var wg sync.WaitGroup
        wg.Add(10)
    
        // start the work
        for i := 0; i < 10; i++ {
            doWork(&wg, i)
        }
    
        // block until all work is done
        wg.Wait()
    }
    

    Go Playground link: https://play.golang.org/p/jDMYuCe7HV

    Inspired by this Golang UK Conference talk: https://youtu.be/yeetIgNeIkc?t=1413

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 mmocr的训练错误,结果全为0
  • ¥15 python的qt5界面
  • ¥15 无线电能传输系统MATLAB仿真问题
  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀