I have the following code:
var (
WorkersNum int = 12
HTTPAddr string = "127.0.0.1:8080"
Delay = 3e9
)
var (
RequestQueue = make(chan Request, 1024)
WorkerQueue chan chan Request
)
type Request struct {
Buf []byte
Delay time.Duration
}
type Worker struct {
ID int
Request chan Request
WorkerQueue chan chan Request
QuitChan chan bool
}
func main() {
fmt.Println("Starting the dispatcher")
StartDispatcher()
fmt.Println("Registering the handler")
http.HandleFunc("/", handleRequest)
fmt.Println("HTTP server listening on", HTTPAddr)
if err := http.ListenAndServe(HTTPAddr, nil); err != nil {
fmt.Println(err.Error())
}
}
func StartDispatcher() {
WorkerQueue = make(chan chan Request, WorkersNum)
for i := 0; i < WorkersNum; i++ {
fmt.Println("Starting worker", i + 1)
worker := NewWorker(i + 1, WorkerQueue)
worker.Start()
}
go func() {
for {
select {
case request := <-RequestQueue:
fmt.Println("Received requeust")
go func() {
worker := <-WorkerQueue
fmt.Println("Dispatching request")
worker <- request
}()
}
}
}()
}
func NewWorker(id int, workerQueue chan chan Request) Worker {
worker := Worker{
ID: id,
Request: make(chan Request),
WorkerQueue: workerQueue,
QuitChan: make(chan bool),
}
return worker
}
func (w *Worker) Start() {
go func() {
for {
w.WorkerQueue <- w.Request
select {
case request := <-w.Request:
fmt.Printf("worker%d: Received work request, delaying for %f seconds
", w.ID, request.Delay.Seconds())
time.Sleep(request.Delay)
writeToFile(request.Buf)
fmt.Printf("worker%d: Saved to file!
", w.ID)
case <-w.QuitChan:
fmt.Printf("worker%d stopping
", w.ID)
return
}
}
}()
}
func handleRequest(w http.ResponseWriter, r *http.Request) {
// make sure it's POST
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// add cors
w.Header().Set("Access-Control-Allow-Origin", "*")
// retrieve
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
//http.Error(w, err, http.StatusBadRequest)
return
}
request := Request{Buf: buf, Delay: Delay}
RequestQueue <- request
fmt.Println("Request queued")
}
I'm pretty new to go language and go routines - can you help me to understand how this code works?
First I call start() function on each worker which assigns Worker.Request to Worker.WorkerQueue - how can I assign empty channel to empty array of channels?
Then in StartDispatcher() I create go routine waiting for requests.
When request comes I add it to RequestQueue variable. What's next? Start() function should trigger, but case is waiting for w.Request. Which is not filled because it's RequestQueue variable that changes.
Could you give me some simple explanation?