dpleylxzx47207117 2016-05-24 16:03
浏览 79
已采纳

使用go例程对ajax请求进行排队

I have the following code:

var (
    WorkersNum int = 12
    HTTPAddr string = "127.0.0.1:8080"
    Delay = 3e9
)

var (
    RequestQueue = make(chan Request, 1024)
    WorkerQueue chan chan Request
)

type Request struct {
    Buf []byte
    Delay time.Duration
}

type Worker struct {
    ID          int
    Request     chan Request
    WorkerQueue chan chan Request
    QuitChan    chan bool
}

func main() {
    fmt.Println("Starting the dispatcher")
    StartDispatcher()

    fmt.Println("Registering the handler")
    http.HandleFunc("/", handleRequest)

    fmt.Println("HTTP server listening on", HTTPAddr)
    if err := http.ListenAndServe(HTTPAddr, nil); err != nil {
        fmt.Println(err.Error())
    }
}

func StartDispatcher() {
    WorkerQueue = make(chan chan Request, WorkersNum)

    for i := 0; i < WorkersNum; i++ {
        fmt.Println("Starting worker", i + 1)
        worker := NewWorker(i + 1, WorkerQueue)
        worker.Start()
    }

    go func() {
        for {
            select {
            case request := <-RequestQueue:
                    fmt.Println("Received requeust")
                    go func() {
                        worker := <-WorkerQueue
                        fmt.Println("Dispatching request")
                        worker <- request
                    }()
            }
        }
    }()
}

func NewWorker(id int, workerQueue chan chan Request) Worker {
    worker := Worker{
        ID:          id,
        Request:     make(chan Request),
        WorkerQueue: workerQueue,
        QuitChan:    make(chan bool),
    }
    return worker
}

func (w *Worker) Start() {
    go func() {
        for {
            w.WorkerQueue <- w.Request
            select {
            case request := <-w.Request:
                    fmt.Printf("worker%d: Received work request, delaying for %f seconds
", w.ID, request.Delay.Seconds())
                    time.Sleep(request.Delay)
                    writeToFile(request.Buf)
                    fmt.Printf("worker%d: Saved to file!
", w.ID)
                case <-w.QuitChan:
                    fmt.Printf("worker%d stopping
", w.ID)
                    return
            }
        }
    }()
}

func handleRequest(w http.ResponseWriter, r *http.Request) {
    // make sure it's POST
    if r.Method != "POST" {
        w.Header().Set("Allow", "POST")
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    // add cors
    w.Header().Set("Access-Control-Allow-Origin", "*")
    // retrieve
    buf, err := ioutil.ReadAll(r.Body)
    if err != nil {
        //http.Error(w, err, http.StatusBadRequest)
        return
    }
    request := Request{Buf: buf, Delay: Delay}
    RequestQueue <- request
    fmt.Println("Request queued")
}

I'm pretty new to go language and go routines - can you help me to understand how this code works?

First I call start() function on each worker which assigns Worker.Request to Worker.WorkerQueue - how can I assign empty channel to empty array of channels?

Then in StartDispatcher() I create go routine waiting for requests.

When request comes I add it to RequestQueue variable. What's next? Start() function should trigger, but case is waiting for w.Request. Which is not filled because it's RequestQueue variable that changes.

Could you give me some simple explanation?

  • 写回答

1条回答 默认 最新

  • doumengjing1500 2016-05-24 18:34
    关注

    And i dont like go func() {...} inside Worker.Start(), IMO Worker.Start() must be synchronous, then you must call it as go worker.Start() in StartDispatcher().

    How it works.

    In StartDispatcher() it creates workers in a loop, which in turn put their's input channel on WorkerQueue buffered channel (buffered channels works like arrays, but channels) and blocks waiting for requests. Then we start a new goroutine to serve incoming requests: pick 1st worker's input channel (worker variable) from buffered channel WorkerQueue and send request to it.

    Worker will pick it up, do the work, and go for the next cycle: put his input channel to WorkerQueue (yes, its the place where its done 1st time when StartDispatcher() starts).

    Anytime you may close workers QuitChan and worker will terminate in case <-w.QuitChan case (read from a closed channels returns immediately).

    Btw, your RequestQueue = make(chan Request, 1024) is also buffered channel, so writes to it does not block (unless its full).

    Hope it helps.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 帮我写一个c++工程
  • ¥30 Eclipse官网打不开,官网首页进不去,显示无法访问此页面,求解决方法
  • ¥15 关于smbclient 库的使用
  • ¥15 微信小程序协议怎么写
  • ¥15 c语言怎么用printf(“\b \b”)与getch()实现黑框里写入与删除?
  • ¥20 怎么用dlib库的算法识别小麦病虫害
  • ¥15 华为ensp模拟器中S5700交换机在配置过程中老是反复重启
  • ¥15 java写代码遇到问题,求帮助
  • ¥15 uniapp uview http 如何实现统一的请求异常信息提示?
  • ¥15 有了解d3和topogram.js库的吗?有偿请教