dstwfcz1377 2019-06-01 09:03
浏览 46
已采纳

后台打印程序概念/ API和渠道:将服务传递到服务HTTP中的队列

Got some help here already that made me move abit forward in this concept im trying, but it still isnt quite working and i hit a conflict that i cant seem to get around.

I have tried here to illustrate what i want in a flowchart - please note that the client can be many many clients that will send in with printjobs therefore we cannot reply on the worker to be processing our job at that moment, but for most it will ( peak times it wont as the processing job of printing can take time ).

enter image description here

type QueueElement struct {
    jobid string
    rw   http.ResponseWriter
  doneChan chan struct{}
}

type GlobalVars struct {
    db   *sql.DB
    wg   sync.WaitGroup
    jobs chan QueueElement
}

func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        fmt.Printf ("incoming
")

            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
            newPrintJob := QueueElement{
                    doneChan: doneC,    
                    jobid:    "jobid",
            }

            gv.jobs <- newPrintJob
            func(doneChan chan struct{},w http.ResponseWriter) {

                  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                    defer cancel()
                    select {
                    //If this triggers first, then this waiting goroutine would exit
                    //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
                    case <-ctx.Done():
                            fmt.Fprintf(w, "job is taking more than 5 seconds to complete
")
                            fmt.Printf ("took longer than 5 secs
")
                    case <-doneChan:
                            fmt.Fprintf(w, "instant reply from serveHTTP
")
                            fmt.Printf ("instant
")
                    }
            }(doneC,w)

    default:
            fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
            job := <-jobs
            processExec ("START /i /b try.cmd")
            fmt.Printf ("job done")
        //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
            job.doneChan <- struct{}{}

    }
}

func main() {
      db, err := sql.Open("sqlite3", "jobs.sqlite")
      if err := db.Ping(); err != nil {
        log.Fatal(err)
    }   

      db.SetMaxOpenConns(1) // prevents locked database error
      _, err = db.Exec(setupSQL)
      if err != nil {
          log.Fatal(err)
      }

    // create a GlobalVars instance
    gv := GlobalVars{
        db  :   db,
        jobs: make(chan QueueElement),
    }
    go worker (gv.jobs)
    // create an http.Server instance and specify our job manager as
    // the handler for requests.
    server := http.Server{
        Handler: &gv,
        Addr : ":8888",
    }
    // start server and accept connections.
    log.Fatal(server.ListenAndServe())
}

The above code is of the serveHTTP and the worker with help from here, initially the func inside the ServeHTTP was a go routine and its here the whole conflict arises for me - the concept is that in the serveHTTP it spawns a process that will get reply from the worker if the worker were able to process the job in time, within 5 seconds.

If the job was able to finish in 1 second i want to reply back instantly after that 1 second to the client, if it takes 3 i want to reply after 3, if it takes more than 5 i will send reply back after 5 seconds ( if the job takes 13 seconds i still want to reply after 5 seconds) - that the client has to poll on the job from now on - but the conflict is :

a) When ServeHTTP exits - then the ResponseWriter closes - and to be able to reply back to the client, we have to write the answer to the ResponseWriter.

b) if i block serveHTTP (like in the code example below where i dont call the func as a go routine) then it affects not only that single API call but it seems that all others after that will be affected, so the first call in will be served correctly in time but the calls that would be coming in at the same time after the first will be sequentially delayed by the blocking operation.

c) if i dont block it ex - and change it to a go routine :

    gv.jobs <- newPrintJob
    go func(doneChan chan struct{},w http.ResponseWriter) {

Then theres no delay - can call in many api's - but problem is here serveHTTP exists right away and thereby kills ResponseWriter and then im not able to reply back to the client !

This is really causing me headache on how i can get around this conflict where i dont cause any blockage to serveHTTP so i can handle all requests parallel, but still able to reply back to the ResponseWriter in question.

Is there anyway of preventing serveHTTP of shutting down a responsewriter even though the function exits ?

  • 写回答

3条回答 默认 最新

  • doudan5136 2019-06-02 14:56
    关注

    I've add some updates to your code. Now it works as you've described.

    package main
    
    import (
        "database/sql"
        "fmt"
        "log"
        "math/rand"
        "net/http"
        "sync"
        "time"
    )
    
    type QueueElement struct {
        jobid    string
        rw       http.ResponseWriter
        doneChan chan struct{}
    }
    
    type GlobalVars struct {
        db   *sql.DB
        wg   sync.WaitGroup
        jobs chan QueueElement
    }
    
    func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    
        switch r.URL.Path {
        case "/StartJob":
            fmt.Printf("incoming
    ")
    
            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
    
            go func(doneChan chan struct{}, w http.ResponseWriter) {
                gv.jobs <- QueueElement{
                    doneChan: doneC,
                    jobid:    "jobid",
                }
            }(doneC, w)
    
            select {
            case <-time.Tick(time.Second * 5):
                fmt.Fprintf(w, "job is taking more than 5 seconds to complete
    ")
                fmt.Printf("took longer than 5 secs
    ")
            case <-doneC:
                fmt.Fprintf(w, "instant reply from serveHTTP
    ")
                fmt.Printf("instant
    ")
            }
        default:
            fmt.Fprintf(w, "No such Api")
        }
    }
    
    func worker(jobs <-chan QueueElement) {
        for {
            job := <-jobs
            fmt.Println("START /i /b try.cmd")
            fmt.Printf("job done")
    
            randTimeDuration := time.Second * time.Duration(rand.Intn(7))
    
            time.Sleep(randTimeDuration)
    
            //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
            job.doneChan <- struct{}{}
        }
    }
    
    func main() {
    
        // create a GlobalVars instance
        gv := GlobalVars{
            //db:   db,
            jobs: make(chan QueueElement),
        }
        go worker(gv.jobs)
        // create an http.Server instance and specify our job manager as
        // the handler for requests.
        server := http.Server{
            Handler: &gv,
            Addr:    ":8888",
        }
        // start server and accept connections.
        log.Fatal(server.ListenAndServe())
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥15 微信会员卡接入微信支付商户号收款
  • ¥15 如何获取烟草零售终端数据
  • ¥15 数学建模招标中位数问题
  • ¥15 phython路径名过长报错 不知道什么问题
  • ¥15 深度学习中模型转换该怎么实现
  • ¥15 HLs设计手写数字识别程序编译通不过
  • ¥15 Stata外部命令安装问题求帮助!
  • ¥15 从键盘随机输入A-H中的一串字符串,用七段数码管方法进行绘制。提交代码及运行截图。
  • ¥15 TYPCE母转母,插入认方向
  • ¥15 如何用python向钉钉机器人发送可以放大的图片?