dpd3982 2019-05-31 06:10
浏览 72
已采纳

如何在serverHTTP和通道之间正确实现延迟的答复/超时?

I have a concept here which I don't know how I should solve correctly with minimum impact on the system in Go.

I'm making a 'print-spooler' where clients can call in on an API (/StartJob) to process printjobs.

Since there is only one printer so the bottleneck is a single worker that processes each job at a time, but clients can pass one job at any given time, it will just queue up and the worker will process each job in the time it takes step by step.

The way I'm doing it is that ServeHTTP pushes the job onto the channel (note here I just pass on the ID it the worker will look up the printdata from that):

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        newPrintJob := QueueElement {jobid: "jobid"}
        gv.printQueue <- newPrintJob
        fmt.Fprintf(w, "instant reply from serveHTTP
")

    default:
        fmt.Fprintf(w, "No such Api")
    }
  }

The Worker then just runs all the time and processes any jobs coming in. Real code there is a bit more but in the end it executes an external process:

  func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        processExec ("START /i /b processAndPrint.exe -"+job.jobid)
      }

The thing here is the external process can take time to execute, sometimes its instant but under some circumstances it can take 1 minute to perform the task before it returns.

My problem here is that now in serverHTTP, I write back to the client instantly with no knowledge if the job was the first in line and instantly can be processed, or if it has been queued up and maybe will be seconds away or minutes away before its processed:

  fmt.Fprintf(w, "instant reply from serveHTTP
")

What I would like is to give the client up to 5 seconds to get a reply back if his job was processed within that time, or if not, tell him he needs to call back later to check the status of his job.

I had several approaches in mind:

  1. In my QueueElemnt I pass on the http.ResponseWriter so I'm able to write to the responsewriter (reply back to client) from the Worker. This I can do if I let the serveHTTP sleep as the ResponseWriter will shut down when the go routine exists. So here I would need to wait in serveHTTP and then when its waiting the worker is allowed to write to the ResponseWriter.

    The problem with this approach is that if the job is minutes away, the Worker won't write anything to that ResponseWriter, and the serveHTTP wouldn't know if a reply had been sent from the worker.

  2. I could create a channel for each QueueElement so the serveHTTP and not only the worker but the actual job if being processed by the worker are able to communicate to each other.

    This approach I haven't tested, but I also worry here that its overkill and heavy on the system as we can have a situation where we have many many many api requests coming in and thereby a big queue that's being processed, so even though I would need to timeout/cancel it after 5 seconds, I think the concept is overkill?

  3. I could maybe pass on a mutexed value in the queueElement that both serveHTTP could check for up to 5 seconds and the queue could check/manipulate but in case a job is finished then the queueElement disappears so this maybe results in conflicts.

  4. I could do a variation of no 1) where I write my own responsewriter and use the flag if something has been written to it already, so the serveHTTP would check for up to 5 seconds on that to check if the Worker already wrote a reply back to the client and in that case exit serveHTTP with no answer, or in case no writes then serveHTTP would write the message back to the client, a bit along the lines of this.

But none of these I feel are very smooth and I don't want to launch forever amount of go-routines or channels or lock myself into mutuxes everywhere as I don't know the impact of what it has on the system.

Can anyone assist me in a working correct way to implement such a thing? I've been reading page up page down and haven't found a nice and clean way of achieving this.

  • 写回答

2条回答 默认 最新

  • doulingzou1712 2019-05-31 07:35
    关注

    I think the easiest approach is the first one slightly modified. You can pass the http.ResponseWriter to the worker, which spanws another worker that actually carries out the job, while the "parent" worker waits for its completition or a timeout. It will reply to the HTTP client as soon as one of the two events occurs first.

    func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    
        switch r.URL.Path {
        case "/StartJob":
            newPrintJob := QueueElement {writer: w, jobid: "jobid"}
            gv.printQueue <- newPrintJob
            fmt.Fprintf(w, "instant reply from serveHTTP
    ")
    
        default:
            fmt.Fprintf(w, "No such Api")
        }
      }
    
    func worker(jobs <-chan QueueElement) {
        for {
            done := make(chan struct{})
            job := <-jobs
    
            go func(d chan struct{}, q QueueElement) {
                processExec ("START /i /b processAndPrint.exe -"+q.jobid)
                d <- struct{}{}
            }(done, job)
    
            select {
                //Timeout
                case <-time.After(time.Second * 5):
                    fmt.Fprintf(w, "job is taking more than 5 seconds to complete
    ")
                //Job processing finished in time
                case <-done:
                    fmt.Fprintf(w, "instant reply from serveHTTP
    ")
            }
       }
    

    You can spawn the "waiting" goroutine as soon as you receive the HTTP request. In this way the timeout timer will take into account the entire processing of the request/job.

    Example:

    package main
    
    import (
        "context"
        "fmt"
        "net/http"
        "time"
    )
    
    func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    
        switch r.URL.Path {
        case "/StartJob":
            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
            newPrintJob := QueueElement{
                doneChan: doneC,
                jobid:    "jobid",
            }
            go func(doneChan chan struct{}) {
                ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
                defer cancel()
                select {
                //If this triggers first, then this waiting goroutine would exit
                //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
                case <-ctx.Done():
                    fmt.Fprintf(w, "job is taking more than 5 seconds to complete
    ")
                case <-doneChan:
                    fmt.Fprintf(w, "instant reply from serveHTTP
    ")
                }
            }(doneC)
            gv.printQueue <- newPrintJob
    
        default:
            fmt.Fprintf(w, "No such Api")
        }
    }
    
    func worker(jobs <-chan QueueElement) {
        for {
            job := <-jobs
            processExec("START /i /b processAndPrint.exe -" + job.jobid)
            job.doneChan <- struct{}{}
    
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥15 c++头文件不能识别CDialog