dpleylxzx47207117 2016-05-24 16:03
浏览 79
已采纳

使用go例程对ajax请求进行排队

I have the following code:

var (
    WorkersNum int = 12
    HTTPAddr string = "127.0.0.1:8080"
    Delay = 3e9
)

var (
    RequestQueue = make(chan Request, 1024)
    WorkerQueue chan chan Request
)

type Request struct {
    Buf []byte
    Delay time.Duration
}

type Worker struct {
    ID          int
    Request     chan Request
    WorkerQueue chan chan Request
    QuitChan    chan bool
}

func main() {
    fmt.Println("Starting the dispatcher")
    StartDispatcher()

    fmt.Println("Registering the handler")
    http.HandleFunc("/", handleRequest)

    fmt.Println("HTTP server listening on", HTTPAddr)
    if err := http.ListenAndServe(HTTPAddr, nil); err != nil {
        fmt.Println(err.Error())
    }
}

func StartDispatcher() {
    WorkerQueue = make(chan chan Request, WorkersNum)

    for i := 0; i < WorkersNum; i++ {
        fmt.Println("Starting worker", i + 1)
        worker := NewWorker(i + 1, WorkerQueue)
        worker.Start()
    }

    go func() {
        for {
            select {
            case request := <-RequestQueue:
                    fmt.Println("Received requeust")
                    go func() {
                        worker := <-WorkerQueue
                        fmt.Println("Dispatching request")
                        worker <- request
                    }()
            }
        }
    }()
}

func NewWorker(id int, workerQueue chan chan Request) Worker {
    worker := Worker{
        ID:          id,
        Request:     make(chan Request),
        WorkerQueue: workerQueue,
        QuitChan:    make(chan bool),
    }
    return worker
}

func (w *Worker) Start() {
    go func() {
        for {
            w.WorkerQueue <- w.Request
            select {
            case request := <-w.Request:
                    fmt.Printf("worker%d: Received work request, delaying for %f seconds
", w.ID, request.Delay.Seconds())
                    time.Sleep(request.Delay)
                    writeToFile(request.Buf)
                    fmt.Printf("worker%d: Saved to file!
", w.ID)
                case <-w.QuitChan:
                    fmt.Printf("worker%d stopping
", w.ID)
                    return
            }
        }
    }()
}

func handleRequest(w http.ResponseWriter, r *http.Request) {
    // make sure it's POST
    if r.Method != "POST" {
        w.Header().Set("Allow", "POST")
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    // add cors
    w.Header().Set("Access-Control-Allow-Origin", "*")
    // retrieve
    buf, err := ioutil.ReadAll(r.Body)
    if err != nil {
        //http.Error(w, err, http.StatusBadRequest)
        return
    }
    request := Request{Buf: buf, Delay: Delay}
    RequestQueue <- request
    fmt.Println("Request queued")
}

I'm pretty new to go language and go routines - can you help me to understand how this code works?

First I call start() function on each worker which assigns Worker.Request to Worker.WorkerQueue - how can I assign empty channel to empty array of channels?

Then in StartDispatcher() I create go routine waiting for requests.

When request comes I add it to RequestQueue variable. What's next? Start() function should trigger, but case is waiting for w.Request. Which is not filled because it's RequestQueue variable that changes.

Could you give me some simple explanation?

  • 写回答

1条回答 默认 最新

  • doumengjing1500 2016-05-24 18:34
    关注

    And i dont like go func() {...} inside Worker.Start(), IMO Worker.Start() must be synchronous, then you must call it as go worker.Start() in StartDispatcher().

    How it works.

    In StartDispatcher() it creates workers in a loop, which in turn put their's input channel on WorkerQueue buffered channel (buffered channels works like arrays, but channels) and blocks waiting for requests. Then we start a new goroutine to serve incoming requests: pick 1st worker's input channel (worker variable) from buffered channel WorkerQueue and send request to it.

    Worker will pick it up, do the work, and go for the next cycle: put his input channel to WorkerQueue (yes, its the place where its done 1st time when StartDispatcher() starts).

    Anytime you may close workers QuitChan and worker will terminate in case <-w.QuitChan case (read from a closed channels returns immediately).

    Btw, your RequestQueue = make(chan Request, 1024) is also buffered channel, so writes to it does not block (unless its full).

    Hope it helps.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥50 求解vmware的网络模式问题 别拿AI回答
  • ¥24 EFS加密后,在同一台电脑解密出错,证书界面找不到对应指纹的证书,未备份证书,求在原电脑解密的方法,可行即采纳
  • ¥15 springboot 3.0 实现Security 6.x版本集成
  • ¥15 PHP-8.1 镜像无法用dockerfile里的CMD命令启动 只能进入容器启动,如何解决?(操作系统-ubuntu)
  • ¥30 请帮我解决一下下面六个代码
  • ¥15 关于资源监视工具的e-care有知道的嘛
  • ¥35 MIMO天线稀疏阵列排布问题
  • ¥60 用visual studio编写程序,利用间接平差求解水准网
  • ¥15 Llama如何调用shell或者Python
  • ¥20 谁能帮我挨个解读这个php语言编的代码什么意思?