dousi1097
dousi1097
2018-05-31 04:57
浏览 91
已采纳

断开对等节点上的ZeroMQ循环故障转移

I'm using ZeroMQ's multiple connect feature, to connect a single DEALER to 2 ROUTERS:

            +----> .Connect() --> ROUTER 1
           /
DEALER ---+------> .Connect() --> ROUTER 2

In my test, I send 10 mesages through the DEALER. I get back a nice even distribution of 5 messages to each of the ROUTER-s.

My problem is, if ROUTER 1 goes away for some reason, the DEALER will still continue to queue messages for it, I think in the assumption that ROUTER 1 will eventually come back. I end up with only 5 messages on ROUTER 2.

What I need to happen is for DEALER to ignore disconnected or failed peers. Is this possible?

I've tried setting ZMQ_SNDHWM and many others, but nothing seems to work.

The only alternative I can see is to do the failover myself, with separate sockets, heartbeats and ACK packets etc. It seems like such a basic pattern should already be implemented by ZeroMQ.


Edit: testing code

package main

import (
    "github.com/pebbe/zmq4"
    "time"
    "log"
    "fmt"
)

func receiveAll(sok *zmq4.Socket) (received int) {
    poller := zmq4.NewPoller()
    poller.Add(sok, zmq4.POLLIN)

    for {
        sockets, err := poller.Poll(100 * time.Millisecond)
        if err != nil {
            log.Print(err)
        }
        if len(sockets) > 0 {
            for _, s := range sockets {
                msg, _ := s.Socket.RecvMessageBytes(0)
                if string(msg[1]) != "Hello World" {
                    log.Fatalf("Unexpected message: %s
", msg)
                }
                received ++
            }
        } else {
            return
        }
    }
}

func main() {

    dealer, _ := zmq4.NewSocket(zmq4.DEALER)
    router1, _ := zmq4.NewSocket(zmq4.ROUTER)
    router2, _ := zmq4.NewSocket(zmq4.ROUTER)

    router1.Bind("tcp://0.0.0.0:6667")
    router2.Bind("tcp://0.0.0.0:6668")

    dealer.Connect("tcp://0.0.0.0:6667")
    dealer.Connect("tcp://0.0.0.0:6668")

    router1.SetSubscribe("")
    router2.SetSubscribe("")
    dealer.SetSubscribe("")

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count1 := receiveAll(router1)
    count2 := receiveAll(router2)

    fmt.Printf("Blue sky scenario: count1=%d count2=%d
", count1, count2)

    // Shut down a peer
    router1.Close()
    time.Sleep(300 * time.Millisecond)

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count := receiveAll(router2)

    fmt.Printf("Peer 1 offline: count=%d
", count)

}
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 邀请回答

1条回答 默认 最新

  • dongxin5429
    dongxin5429 2018-05-31 16:28
    已采纳

    What I need to happen is for DEALER to ignore disconnected or failed peers. Is this possible ?

    Oh sure, it is. There is need to tweak the default ( inactive ) values, using you use-case specific settings in :

    • a .setsockopt( ZMQ.IMMEDIATE, 1 ) for not buffering message-instances for peer, that do not seem to be "alive"
    • a .setsockopt( ZMQ.HEARTBEAT_IVL, <ms> ) for sending heartbeats
    • a .setsockopt( ZMQ.HEARTBEAT_TTL, <ms> ) for a Time-To-Live setting
    • a .setsockopt( ZMQ.HEARTBEAT_TIMEOUT, <ms>) for a timeout threshold
    • a .setsockopt( ZMQ.HANDSHAKE_IVL, <ms> ) for managing (re-)establishment timeouts.

    For details, check your language binding and what native API version it actually uses under the hood. Most of these settings are available since native-API v 3.x, the most recent native-API v 4.2.2 documentation will help you tune the values and configuration strategies.

    点赞 评论

相关推荐