dongzhiman2162 2017-10-18 22:53
浏览 127
已采纳

Golang HTTP请求工作者池

I'm trying to build a system, worker pool / jobqueue, to handle as many http requests as possible on each API endpoint. I looked into this example and got it working just fine except that I stumbled upon the problem that I don't understand how to expand the pool / jobqueue to different endpoints.

For scenario sake let's sketch an Golang http server that has a million request / min across different endpoints and request types GET & POST ETC.

How can I expand on this concept? Should I create different worker pools and jobs for each endpoint. Or can I create different jobs and enter them in the same queue and have the same pool handle these?

I want to maintain simplicity where if I create a new API endpoint I don't have to create new worker pools, so I can focus just on the api. But performance is also very much in mind.

The code I'm trying to build on is taken from the example linked earlier, here is a github 'gist' of somebody else with this code.

  • 写回答

3条回答 默认 最新

  • dti3914 2017-10-20 08:52
    关注

    One thing up front: If you are running an HTTP server (Go's standard server anyway), you cannot control the number of goroutines without stopping and restarting the server. Each request starts at least one goroutine, and there's nothing you can do about that. The good news is that this is usually not a problem, since goroutines are so lightweight. However, it is perfectly reasonable that you want to keep the number of goroutines that are doing hard work under control.

    You can put any value into a channel, including functions. So if the goal is to only having to write code in http handlers, let the jobs be closures -- the workers don't know (or care) what they are working on.

    package main
    
    import (
        "encoding/json"
        "io/ioutil"
        "net/http"
    )
    
    var largePool chan func()
    var smallPool chan func()
    
    func main() {
        // Start two different sized worker pools (e.g., for different workloads).
        // Cancelation and graceful shutdown omited for brevity.
    
        largePool = make(chan func(), 100)
        smallPool = make(chan func(), 10)
    
        for i := 0; i < 100; i++ {
                go func() {
                        for f := range largePool {
                                f()
                        }
                }()
        }
    
        for i := 0; i < 10; i++ {
                go func() {
                        for f := range smallPool {
                                f()
                        }
                }()
        }
    
        http.HandleFunc("/endpoint-1", handler1)
        http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?
    
        http.ListenAndServe(":8080", nil)
    }
    
    func handler1(w http.ResponseWriter, r *http.Request) {
        // Imagine a JSON body containing a URL that we are expected to fetch.
        // Light work that doesn't consume many of *our* resources and can be done
        // in bulk, so we put in in the large pool.
        var job struct{ URL string }
    
        if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
                http.Error(w, err.Error(), http.StatusBadRequest)
                return
        }
    
        go func() {
                largePool <- func() {
                        http.Get(job.URL)
                        // Do something with the response
                }
        }()
    
        w.WriteHeader(http.StatusAccepted)
    }
    
    func handler2(w http.ResponseWriter, r *http.Request) {
        // The request body is an image that we want to do some fancy processing
        // on. That's hard work; we don't want to do too many of them at once, so
        // so we put those jobs in the small pool.
    
        b, err := ioutil.ReadAll(r.Body)
        if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
    
        go func() {
                smallPool <- func() {
                        processImage(b)
                }
        }()
        w.WriteHeader(http.StatusAccepted)
    }
    
    func processImage(b []byte) {}
    

    This is a very simple example to get the point across. It doesn't matter much how you setup your worker pools. You just need a clever job definition. In the example above it is a closure, but you could also define a Job interface, for instance.

    type Job interface {
        Do()
    }
    
    var largePool chan Job
    var smallPool chan Job
    

    Now, I wouldn't call the whole worker pool approach "simple". You said your goal is to limit the number of goroutines (that are doing work). That doesn't require workers at all; it only needs a limiter. Here is the same example as above, but using channels as semaphores to limit concurrency.

    package main
    
    import (
        "encoding/json"
        "io/ioutil"
        "net/http"
    )
    
    var largePool chan struct{}
    var smallPool chan struct{}
    
    func main() {
        largePool = make(chan struct{}, 100)
        smallPool = make(chan struct{}, 10)
    
        http.HandleFunc("/endpoint-1", handler1)
        http.HandleFunc("/endpoint-2", handler2)
    
        http.ListenAndServe(":8080", nil)
    }
    
    func handler1(w http.ResponseWriter, r *http.Request) {
        var job struct{ URL string }
    
        if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
                http.Error(w, err.Error(), http.StatusBadRequest)
                return
        }
    
        go func() {
                // Block until there are fewer than cap(largePool) light-work
                // goroutines running.
                largePool <- struct{}{}
                defer func() { <-largePool }() // Let everyone that we are done
    
                http.Get(job.URL)
        }()
    
        w.WriteHeader(http.StatusAccepted)
    }
    
    func handler2(w http.ResponseWriter, r *http.Request) {
        b, err := ioutil.ReadAll(r.Body)
        if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
    
        go func() {
                // Block until there are fewer than cap(smallPool) hard-work
                // goroutines running.
                smallPool <- struct{}{}
                defer func() { <-smallPool }() // Let everyone that we are done
    
                processImage(b)
        }()
    
        w.WriteHeader(http.StatusAccepted)
    }
    
    func processImage(b []byte) {}
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥100 有人会搭建GPT-J-6B框架吗?有偿
  • ¥15 求差集那个函数有问题,有无佬可以解决
  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名