dongliu6848 2017-06-21 22:49
浏览 261
已采纳

如何使用ZeroMQ来使已出站排队但尚未在设定时间内发送的消息超时?

I have an application running on a server which takes requests from a phone app and then load balances the request across worker servers. I'm trying to add a timeout in the case that messages on the main server that have been in the outbound queue for the length of the timeout are removed from the queue. More specifically, the application on the main server is written in golang and implements the Paranoid Pirate Pattern of load balancing. The code I currently have is:

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
    "time"
)

const (
    HEARTBEAT_LIVENESS = 3
    HEARTBEAT_INTERVAL = 1500 * time.Millisecond

    MESSAGE_READY     = "\001"
    MESSAGE_HEARTBEAT = "\002"
)

var (
    client *zmq.Socket
    backend *zmq.Socket
    frontend *zmq.Socket
    workerPoller *zmq.Poller
    brokerPoller *zmq.Poller
    workerQueue []Worker
)

type Worker struct {
    Id string
    Expire time.Time
}

type RequestWrapper {
    RequestToSend Request

}

func NewWorker(id string) Worker {
    return Worker{
        Id: id,
        Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
    }
}

func AddReadyWorker(workers []Worker, worker Worker) []Worker {
    fmt.Println(worker.Id, " joined")
    for i, w := range workers {
        if worker.Id == w.Id {
            if i == 0 {
                workers = workers[1:]
            } else if i == len(workers)-1 {
                workers = workers[:i]
            } else {
                workers = append(workers[:i], workers[i+1:]...)
            }
            break
        }
    }
    return append(workers, worker)
}

func PurgeInactiveWorkers() {
    now := time.Now()
    for i, worker := range workerQueue {
        if now.Before(worker.Expire) {
            workerQueue = workerQueue[i:]
            return
        }
    }

    workerQueue = workerQueue[0:0]
}

func LoadBalance() {
// Loop:
    heartbeat := time.Tick(HEARTBEAT_INTERVAL)
    for {
        var sockets []zmq.Polled

        // If you have available workers, poll on the both front and backend
        // If not poll on backend with infinite timeout
        if len(workerQueue) > 0 {
            sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
        } else {
            sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
        }

        for _, socket := range sockets {
            switch socket.Socket {
                // backend is a router
                case backend:
                    workerId, _ := backend.Recv(0)
                    workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
                    clientId, _ := backend.Recv(0)
                    if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
                        route, _ := backend.Recv(0)
                        message, _ := backend.RecvBytes(0)

                        fmt.Println("Received response")
                        RouteResponse(route, message)

                        // frontend.Send(clientId, zmq.SNDMORE)
                        // frontend.Send("", zmq.SNDMORE)
                        // frontend.SendBytes(message, 0)
                    }
                // frontend is a dealer
                case frontend:
                    clientId, _ := frontend.Recv(0)
                    route, _ := frontend.Recv(0)
                    message, _ := frontend.RecvBytes(0)

                    backend.Send(workerQueue[0].Id, zmq.SNDMORE)
                    backend.Send(clientId, zmq.SNDMORE)
                    backend.Send(route, zmq.SNDMORE)
                    backend.SendBytes(message, 0)

                    workerQueue = workerQueue[1:]
            }
        }

        select {
            case <-heartbeat:
                for _, worker := range workerQueue {
                    backend.Send(worker.Id, zmq.SNDMORE)
                    backend.Send(MESSAGE_HEARTBEAT, 0)
                }
                break
            default:
        }

        PurgeInactiveWorkers()
    }
}

If the backend sends a message, but it is not actually sent to a worker in some period of time, I want it to expire and not ever be sent. Is there a socket option that can accomplish this? If not, what would I have to do to accomplish this?

Two ways I think I can do this without socket options are:

1) Have the backend wrap the message in a wrapper and send to a golang queue and not through zeromq. The wrapper contains the time that the message was "sent". The backend concurrently pulls from the front of the golang queue one at a time and checks if the message is expired. If so, don't send, if not, send the message. I could have the backend add the message to the golang queue first and then truly send it out after in the same block of code. That way, I don't need a lock.

2) Send the wrapper message through zeromq to a retriever and the retriever checks if its expired and returns early. I don't like this because it seems like its bad for performance.

  • 写回答

3条回答 默认 最新

  • dongye1942 2017-07-06 23:15
    关注

    In the end, the solution was to add an expires-at property like @colini and @bazza mentioned, and to drop timed out messages from the queue after each heartbeat. However, doing so and satisfying all requirements of my application was proving to be more difficult than first glance, so I ended up using RabbitMQ, whose ttl-expires argument provided the desired functionality.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

悬赏问题

  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?