dowdw44426 2017-02-23 05:44
浏览 82
已采纳

进行队列处理并重试失败

We have a bunch of files to be uploaded to remote blob store after processing.

Currently, the frontend (PHP) creates a redis list of such files and gives it a unique ID, called JobID. It then passes the unique ID to a beanstalk tube, which is received by a Go process. It uses a library called Go workers to process each job ID in the fashion of what net/http does. It receives the job ID, retrieves the redis list and starts processing files.

However, currently only one file is processed at a time. Since the operation here is I/O bound, not CPU bound, intuition suggests that it would be benefitial to use a goroutine per file.

However, we want to retry uploading on failure, as well as track the number of items processed per job. We cannot start a unbound number of goroutines because a single Job can contain about ~10k files to process and 100s of such Jobs can be sent per second during peak times. What would be the correct approach for this?

NB: We can change the technology stack a bit if needed (such as swapping out beanstalkd for something)

  • 写回答

1条回答 默认 最新

  • douran9707 2017-02-23 06:16
    关注

    You can limit the number of goroutines by using a buffered chan with a size of the maximum number of goroutines you want. You can then block on this chan if it reaches maximum capacity. As your goroutines finish, they will free up slots to allow new goroutines to run.

    Example:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    var (
        concurrent    = 5
        semaphoreChan = make(chan struct{}, concurrent)
    )
    
    func doWork(wg *sync.WaitGroup, item int) {
        // block while full
        semaphoreChan <- struct{}{}
    
        go func() {
            defer func() {
                // read to release a slot
                <-semaphoreChan
                wg.Done()
            }()
            // This is where your work actually gets done
            fmt.Println(item)
        }()
    }
    
    func main() {
        // we need this for the example so that we can block until all goroutines finish
        var wg sync.WaitGroup
        wg.Add(10)
    
        // start the work
        for i := 0; i < 10; i++ {
            doWork(&wg, i)
        }
    
        // block until all work is done
        wg.Wait()
    }
    

    Go Playground link: https://play.golang.org/p/jDMYuCe7HV

    Inspired by this Golang UK Conference talk: https://youtu.be/yeetIgNeIkc?t=1413

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 seatunnel 怎么配置Elasticsearch
  • ¥15 PSCAD安装问题 ERROR: Visual Studio 2013, 2015, 2017 or 2019 is not found in the system.
  • ¥15 (标签-MATLAB|关键词-多址)
  • ¥15 关于#MATLAB#的问题,如何解决?(相关搜索:信噪比,系统容量)
  • ¥500 52810做蓝牙接受端
  • ¥15 基于PLC的三轴机械手程序
  • ¥15 多址通信方式的抗噪声性能和系统容量对比
  • ¥15 winform的chart曲线生成时有凸起
  • ¥15 msix packaging tool打包问题
  • ¥15 finalshell节点的搭建代码和那个端口代码教程