dongzhiman2162 2017-10-18 22:53
浏览 127
已采纳

Golang HTTP请求工作者池

I'm trying to build a system, worker pool / jobqueue, to handle as many http requests as possible on each API endpoint. I looked into this example and got it working just fine except that I stumbled upon the problem that I don't understand how to expand the pool / jobqueue to different endpoints.

For scenario sake let's sketch an Golang http server that has a million request / min across different endpoints and request types GET & POST ETC.

How can I expand on this concept? Should I create different worker pools and jobs for each endpoint. Or can I create different jobs and enter them in the same queue and have the same pool handle these?

I want to maintain simplicity where if I create a new API endpoint I don't have to create new worker pools, so I can focus just on the api. But performance is also very much in mind.

The code I'm trying to build on is taken from the example linked earlier, here is a github 'gist' of somebody else with this code.

  • 写回答

3条回答 默认 最新

  • dti3914 2017-10-20 08:52
    关注

    One thing up front: If you are running an HTTP server (Go's standard server anyway), you cannot control the number of goroutines without stopping and restarting the server. Each request starts at least one goroutine, and there's nothing you can do about that. The good news is that this is usually not a problem, since goroutines are so lightweight. However, it is perfectly reasonable that you want to keep the number of goroutines that are doing hard work under control.

    You can put any value into a channel, including functions. So if the goal is to only having to write code in http handlers, let the jobs be closures -- the workers don't know (or care) what they are working on.

    package main
    
    import (
        "encoding/json"
        "io/ioutil"
        "net/http"
    )
    
    var largePool chan func()
    var smallPool chan func()
    
    func main() {
        // Start two different sized worker pools (e.g., for different workloads).
        // Cancelation and graceful shutdown omited for brevity.
    
        largePool = make(chan func(), 100)
        smallPool = make(chan func(), 10)
    
        for i := 0; i < 100; i++ {
                go func() {
                        for f := range largePool {
                                f()
                        }
                }()
        }
    
        for i := 0; i < 10; i++ {
                go func() {
                        for f := range smallPool {
                                f()
                        }
                }()
        }
    
        http.HandleFunc("/endpoint-1", handler1)
        http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?
    
        http.ListenAndServe(":8080", nil)
    }
    
    func handler1(w http.ResponseWriter, r *http.Request) {
        // Imagine a JSON body containing a URL that we are expected to fetch.
        // Light work that doesn't consume many of *our* resources and can be done
        // in bulk, so we put in in the large pool.
        var job struct{ URL string }
    
        if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
                http.Error(w, err.Error(), http.StatusBadRequest)
                return
        }
    
        go func() {
                largePool <- func() {
                        http.Get(job.URL)
                        // Do something with the response
                }
        }()
    
        w.WriteHeader(http.StatusAccepted)
    }
    
    func handler2(w http.ResponseWriter, r *http.Request) {
        // The request body is an image that we want to do some fancy processing
        // on. That's hard work; we don't want to do too many of them at once, so
        // so we put those jobs in the small pool.
    
        b, err := ioutil.ReadAll(r.Body)
        if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
    
        go func() {
                smallPool <- func() {
                        processImage(b)
                }
        }()
        w.WriteHeader(http.StatusAccepted)
    }
    
    func processImage(b []byte) {}
    

    This is a very simple example to get the point across. It doesn't matter much how you setup your worker pools. You just need a clever job definition. In the example above it is a closure, but you could also define a Job interface, for instance.

    type Job interface {
        Do()
    }
    
    var largePool chan Job
    var smallPool chan Job
    

    Now, I wouldn't call the whole worker pool approach "simple". You said your goal is to limit the number of goroutines (that are doing work). That doesn't require workers at all; it only needs a limiter. Here is the same example as above, but using channels as semaphores to limit concurrency.

    package main
    
    import (
        "encoding/json"
        "io/ioutil"
        "net/http"
    )
    
    var largePool chan struct{}
    var smallPool chan struct{}
    
    func main() {
        largePool = make(chan struct{}, 100)
        smallPool = make(chan struct{}, 10)
    
        http.HandleFunc("/endpoint-1", handler1)
        http.HandleFunc("/endpoint-2", handler2)
    
        http.ListenAndServe(":8080", nil)
    }
    
    func handler1(w http.ResponseWriter, r *http.Request) {
        var job struct{ URL string }
    
        if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
                http.Error(w, err.Error(), http.StatusBadRequest)
                return
        }
    
        go func() {
                // Block until there are fewer than cap(largePool) light-work
                // goroutines running.
                largePool <- struct{}{}
                defer func() { <-largePool }() // Let everyone that we are done
    
                http.Get(job.URL)
        }()
    
        w.WriteHeader(http.StatusAccepted)
    }
    
    func handler2(w http.ResponseWriter, r *http.Request) {
        b, err := ioutil.ReadAll(r.Body)
        if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
    
        go func() {
                // Block until there are fewer than cap(smallPool) hard-work
                // goroutines running.
                smallPool <- struct{}{}
                defer func() { <-smallPool }() // Let everyone that we are done
    
                processImage(b)
        }()
    
        w.WriteHeader(http.StatusAccepted)
    }
    
    func processImage(b []byte) {}
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥15 单片机学习顺序问题!!
  • ¥15 ikuai客户端多拨vpn,重启总是有个别重拨不上
  • ¥20 关于#anlogic#sdram#的问题,如何解决?(关键词-performance)
  • ¥15 相敏解调 matlab
  • ¥15 求lingo代码和思路
  • ¥15 公交车和无人机协同运输
  • ¥15 stm32代码移植没反应
  • ¥15 matlab基于pde算法图像修复,为什么只能对示例图像有效
  • ¥100 连续两帧图像高速减法
  • ¥15 如何绘制动力学系统的相图