dongliu6848
dongliu6848
2017-06-21 22:49
浏览 221
已采纳

如何使用ZeroMQ来使已出站排队但尚未在设定时间内发送的消息超时?

I have an application running on a server which takes requests from a phone app and then load balances the request across worker servers. I'm trying to add a timeout in the case that messages on the main server that have been in the outbound queue for the length of the timeout are removed from the queue. More specifically, the application on the main server is written in golang and implements the Paranoid Pirate Pattern of load balancing. The code I currently have is:

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
    "time"
)

const (
    HEARTBEAT_LIVENESS = 3
    HEARTBEAT_INTERVAL = 1500 * time.Millisecond

    MESSAGE_READY     = "\001"
    MESSAGE_HEARTBEAT = "\002"
)

var (
    client *zmq.Socket
    backend *zmq.Socket
    frontend *zmq.Socket
    workerPoller *zmq.Poller
    brokerPoller *zmq.Poller
    workerQueue []Worker
)

type Worker struct {
    Id string
    Expire time.Time
}

type RequestWrapper {
    RequestToSend Request

}

func NewWorker(id string) Worker {
    return Worker{
        Id: id,
        Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
    }
}

func AddReadyWorker(workers []Worker, worker Worker) []Worker {
    fmt.Println(worker.Id, " joined")
    for i, w := range workers {
        if worker.Id == w.Id {
            if i == 0 {
                workers = workers[1:]
            } else if i == len(workers)-1 {
                workers = workers[:i]
            } else {
                workers = append(workers[:i], workers[i+1:]...)
            }
            break
        }
    }
    return append(workers, worker)
}

func PurgeInactiveWorkers() {
    now := time.Now()
    for i, worker := range workerQueue {
        if now.Before(worker.Expire) {
            workerQueue = workerQueue[i:]
            return
        }
    }

    workerQueue = workerQueue[0:0]
}

func LoadBalance() {
// Loop:
    heartbeat := time.Tick(HEARTBEAT_INTERVAL)
    for {
        var sockets []zmq.Polled

        // If you have available workers, poll on the both front and backend
        // If not poll on backend with infinite timeout
        if len(workerQueue) > 0 {
            sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
        } else {
            sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
        }

        for _, socket := range sockets {
            switch socket.Socket {
                // backend is a router
                case backend:
                    workerId, _ := backend.Recv(0)
                    workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
                    clientId, _ := backend.Recv(0)
                    if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
                        route, _ := backend.Recv(0)
                        message, _ := backend.RecvBytes(0)

                        fmt.Println("Received response")
                        RouteResponse(route, message)

                        // frontend.Send(clientId, zmq.SNDMORE)
                        // frontend.Send("", zmq.SNDMORE)
                        // frontend.SendBytes(message, 0)
                    }
                // frontend is a dealer
                case frontend:
                    clientId, _ := frontend.Recv(0)
                    route, _ := frontend.Recv(0)
                    message, _ := frontend.RecvBytes(0)

                    backend.Send(workerQueue[0].Id, zmq.SNDMORE)
                    backend.Send(clientId, zmq.SNDMORE)
                    backend.Send(route, zmq.SNDMORE)
                    backend.SendBytes(message, 0)

                    workerQueue = workerQueue[1:]
            }
        }

        select {
            case <-heartbeat:
                for _, worker := range workerQueue {
                    backend.Send(worker.Id, zmq.SNDMORE)
                    backend.Send(MESSAGE_HEARTBEAT, 0)
                }
                break
            default:
        }

        PurgeInactiveWorkers()
    }
}

If the backend sends a message, but it is not actually sent to a worker in some period of time, I want it to expire and not ever be sent. Is there a socket option that can accomplish this? If not, what would I have to do to accomplish this?

Two ways I think I can do this without socket options are:

1) Have the backend wrap the message in a wrapper and send to a golang queue and not through zeromq. The wrapper contains the time that the message was "sent". The backend concurrently pulls from the front of the golang queue one at a time and checks if the message is expired. If so, don't send, if not, send the message. I could have the backend add the message to the golang queue first and then truly send it out after in the same block of code. That way, I don't need a lock.

2) Send the wrapper message through zeromq to a retriever and the retriever checks if its expired and returns early. I don't like this because it seems like its bad for performance.

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 邀请回答

3条回答 默认 最新

  • dongye1942
    dongye1942 2017-07-06 23:15
    已采纳

    In the end, the solution was to add an expires-at property like @colini and @bazza mentioned, and to drop timed out messages from the queue after each heartbeat. However, doing so and satisfying all requirements of my application was proving to be more difficult than first glance, so I ended up using RabbitMQ, whose ttl-expires argument provided the desired functionality.

    点赞 评论
  • doufan1363
    doufan1363 2017-06-21 22:59

    In newer API-versions, there is an option to discard all "old" messages and always deliver just the "newest" one.

    If that meets your expectations, and if all peers meet API v.4.0+, you are done.

    ZMQ_CONFLATE: Keep only last message

    If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.
    Option value type int
    Option value unit boolean
    Default value 0 (false)
    Applicable socket types ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER

    点赞 评论
  • douguomou5094
    douguomou5094 2017-06-22 05:04

    What you're trying to do is use communication as an execution rendezvous. The sender wants to know something about when the receiver gets messages.

    ZMQ implements the Actor model. What you need is a modification of the Communicating Sequential Processes model (one where sends timeout). Basically you need to add control message flows to/from the workers, the idea being that the server asks the worker to receive a message and the server waits for the reply. The reply means that the worker is ready to receive a message right now, and that the server and worker have both rendezvoused at a send/receive in their program flows. If that reply fails to arrive within timeout seconds, then the server doesn't send the actual message.

    Or you could cheat by having everything going to the workers regardless, wrapped in a message that carries a "sent at time X" field, and have the worker decide to discard old messages.

    点赞 评论

相关推荐