I have an application running on a server which takes requests from a phone app and then load balances the request across worker servers. I'm trying to add a timeout in the case that messages on the main server that have been in the outbound queue for the length of the timeout are removed from the queue. More specifically, the application on the main server is written in golang and implements the Paranoid Pirate Pattern of load balancing. The code I currently have is:
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"time"
)
const (
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1500 * time.Millisecond
MESSAGE_READY = "\001"
MESSAGE_HEARTBEAT = "\002"
)
var (
client *zmq.Socket
backend *zmq.Socket
frontend *zmq.Socket
workerPoller *zmq.Poller
brokerPoller *zmq.Poller
workerQueue []Worker
)
type Worker struct {
Id string
Expire time.Time
}
type RequestWrapper {
RequestToSend Request
}
func NewWorker(id string) Worker {
return Worker{
Id: id,
Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}
func AddReadyWorker(workers []Worker, worker Worker) []Worker {
fmt.Println(worker.Id, " joined")
for i, w := range workers {
if worker.Id == w.Id {
if i == 0 {
workers = workers[1:]
} else if i == len(workers)-1 {
workers = workers[:i]
} else {
workers = append(workers[:i], workers[i+1:]...)
}
break
}
}
return append(workers, worker)
}
func PurgeInactiveWorkers() {
now := time.Now()
for i, worker := range workerQueue {
if now.Before(worker.Expire) {
workerQueue = workerQueue[i:]
return
}
}
workerQueue = workerQueue[0:0]
}
func LoadBalance() {
// Loop:
heartbeat := time.Tick(HEARTBEAT_INTERVAL)
for {
var sockets []zmq.Polled
// If you have available workers, poll on the both front and backend
// If not poll on backend with infinite timeout
if len(workerQueue) > 0 {
sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
} else {
sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
}
for _, socket := range sockets {
switch socket.Socket {
// backend is a router
case backend:
workerId, _ := backend.Recv(0)
workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
clientId, _ := backend.Recv(0)
if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
route, _ := backend.Recv(0)
message, _ := backend.RecvBytes(0)
fmt.Println("Received response")
RouteResponse(route, message)
// frontend.Send(clientId, zmq.SNDMORE)
// frontend.Send("", zmq.SNDMORE)
// frontend.SendBytes(message, 0)
}
// frontend is a dealer
case frontend:
clientId, _ := frontend.Recv(0)
route, _ := frontend.Recv(0)
message, _ := frontend.RecvBytes(0)
backend.Send(workerQueue[0].Id, zmq.SNDMORE)
backend.Send(clientId, zmq.SNDMORE)
backend.Send(route, zmq.SNDMORE)
backend.SendBytes(message, 0)
workerQueue = workerQueue[1:]
}
}
select {
case <-heartbeat:
for _, worker := range workerQueue {
backend.Send(worker.Id, zmq.SNDMORE)
backend.Send(MESSAGE_HEARTBEAT, 0)
}
break
default:
}
PurgeInactiveWorkers()
}
}
If the backend sends a message, but it is not actually sent to a worker in some period of time, I want it to expire and not ever be sent. Is there a socket option that can accomplish this? If not, what would I have to do to accomplish this?
Two ways I think I can do this without socket options are:
1) Have the backend wrap the message in a wrapper and send to a golang queue and not through zeromq. The wrapper contains the time that the message was "sent". The backend concurrently pulls from the front of the golang queue one at a time and checks if the message is expired. If so, don't send, if not, send the message. I could have the backend add the message to the golang queue first and then truly send it out after in the same block of code. That way, I don't need a lock.
2) Send the wrapper message through zeromq to a retriever and the retriever checks if its expired and returns early. I don't like this because it seems like its bad for performance.