dstwfcz1377 2019-06-01 09:03
浏览 46
已采纳

后台打印程序概念/ API和渠道:将服务传递到服务HTTP中的队列

Got some help here already that made me move abit forward in this concept im trying, but it still isnt quite working and i hit a conflict that i cant seem to get around.

I have tried here to illustrate what i want in a flowchart - please note that the client can be many many clients that will send in with printjobs therefore we cannot reply on the worker to be processing our job at that moment, but for most it will ( peak times it wont as the processing job of printing can take time ).

enter image description here

type QueueElement struct {
    jobid string
    rw   http.ResponseWriter
  doneChan chan struct{}
}

type GlobalVars struct {
    db   *sql.DB
    wg   sync.WaitGroup
    jobs chan QueueElement
}

func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        fmt.Printf ("incoming
")

            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
            newPrintJob := QueueElement{
                    doneChan: doneC,    
                    jobid:    "jobid",
            }

            gv.jobs <- newPrintJob
            func(doneChan chan struct{},w http.ResponseWriter) {

                  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                    defer cancel()
                    select {
                    //If this triggers first, then this waiting goroutine would exit
                    //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
                    case <-ctx.Done():
                            fmt.Fprintf(w, "job is taking more than 5 seconds to complete
")
                            fmt.Printf ("took longer than 5 secs
")
                    case <-doneChan:
                            fmt.Fprintf(w, "instant reply from serveHTTP
")
                            fmt.Printf ("instant
")
                    }
            }(doneC,w)

    default:
            fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
            job := <-jobs
            processExec ("START /i /b try.cmd")
            fmt.Printf ("job done")
        //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
            job.doneChan <- struct{}{}

    }
}

func main() {
      db, err := sql.Open("sqlite3", "jobs.sqlite")
      if err := db.Ping(); err != nil {
        log.Fatal(err)
    }   

      db.SetMaxOpenConns(1) // prevents locked database error
      _, err = db.Exec(setupSQL)
      if err != nil {
          log.Fatal(err)
      }

    // create a GlobalVars instance
    gv := GlobalVars{
        db  :   db,
        jobs: make(chan QueueElement),
    }
    go worker (gv.jobs)
    // create an http.Server instance and specify our job manager as
    // the handler for requests.
    server := http.Server{
        Handler: &gv,
        Addr : ":8888",
    }
    // start server and accept connections.
    log.Fatal(server.ListenAndServe())
}

The above code is of the serveHTTP and the worker with help from here, initially the func inside the ServeHTTP was a go routine and its here the whole conflict arises for me - the concept is that in the serveHTTP it spawns a process that will get reply from the worker if the worker were able to process the job in time, within 5 seconds.

If the job was able to finish in 1 second i want to reply back instantly after that 1 second to the client, if it takes 3 i want to reply after 3, if it takes more than 5 i will send reply back after 5 seconds ( if the job takes 13 seconds i still want to reply after 5 seconds) - that the client has to poll on the job from now on - but the conflict is :

a) When ServeHTTP exits - then the ResponseWriter closes - and to be able to reply back to the client, we have to write the answer to the ResponseWriter.

b) if i block serveHTTP (like in the code example below where i dont call the func as a go routine) then it affects not only that single API call but it seems that all others after that will be affected, so the first call in will be served correctly in time but the calls that would be coming in at the same time after the first will be sequentially delayed by the blocking operation.

c) if i dont block it ex - and change it to a go routine :

    gv.jobs <- newPrintJob
    go func(doneChan chan struct{},w http.ResponseWriter) {

Then theres no delay - can call in many api's - but problem is here serveHTTP exists right away and thereby kills ResponseWriter and then im not able to reply back to the client !

This is really causing me headache on how i can get around this conflict where i dont cause any blockage to serveHTTP so i can handle all requests parallel, but still able to reply back to the ResponseWriter in question.

Is there anyway of preventing serveHTTP of shutting down a responsewriter even though the function exits ?

  • 写回答

3条回答 默认 最新

  • doudan5136 2019-06-02 14:56
    关注

    I've add some updates to your code. Now it works as you've described.

    package main
    
    import (
        "database/sql"
        "fmt"
        "log"
        "math/rand"
        "net/http"
        "sync"
        "time"
    )
    
    type QueueElement struct {
        jobid    string
        rw       http.ResponseWriter
        doneChan chan struct{}
    }
    
    type GlobalVars struct {
        db   *sql.DB
        wg   sync.WaitGroup
        jobs chan QueueElement
    }
    
    func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    
        switch r.URL.Path {
        case "/StartJob":
            fmt.Printf("incoming
    ")
    
            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
    
            go func(doneChan chan struct{}, w http.ResponseWriter) {
                gv.jobs <- QueueElement{
                    doneChan: doneC,
                    jobid:    "jobid",
                }
            }(doneC, w)
    
            select {
            case <-time.Tick(time.Second * 5):
                fmt.Fprintf(w, "job is taking more than 5 seconds to complete
    ")
                fmt.Printf("took longer than 5 secs
    ")
            case <-doneC:
                fmt.Fprintf(w, "instant reply from serveHTTP
    ")
                fmt.Printf("instant
    ")
            }
        default:
            fmt.Fprintf(w, "No such Api")
        }
    }
    
    func worker(jobs <-chan QueueElement) {
        for {
            job := <-jobs
            fmt.Println("START /i /b try.cmd")
            fmt.Printf("job done")
    
            randTimeDuration := time.Second * time.Duration(rand.Intn(7))
    
            time.Sleep(randTimeDuration)
    
            //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
            job.doneChan <- struct{}{}
        }
    }
    
    func main() {
    
        // create a GlobalVars instance
        gv := GlobalVars{
            //db:   db,
            jobs: make(chan QueueElement),
        }
        go worker(gv.jobs)
        // create an http.Server instance and specify our job manager as
        // the handler for requests.
        server := http.Server{
            Handler: &gv,
            Addr:    ":8888",
        }
        // start server and accept connections.
        log.Fatal(server.ListenAndServe())
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥15 如何在炒股软件中,爬到我想看的日k线
  • ¥15 51单片机中C语言怎么做到下面类似的功能的函数(相关搜索:c语言)
  • ¥15 seatunnel 怎么配置Elasticsearch
  • ¥15 PSCAD安装问题 ERROR: Visual Studio 2013, 2015, 2017 or 2019 is not found in the system.
  • ¥15 (标签-MATLAB|关键词-多址)
  • ¥15 关于#MATLAB#的问题,如何解决?(相关搜索:信噪比,系统容量)
  • ¥500 52810做蓝牙接受端
  • ¥15 基于PLC的三轴机械手程序
  • ¥15 多址通信方式的抗噪声性能和系统容量对比
  • ¥15 winform的chart曲线生成时有凸起