dousi1097 2018-05-31 04:57
浏览 114
已采纳

断开对等节点上的ZeroMQ循环故障转移

I'm using ZeroMQ's multiple connect feature, to connect a single DEALER to 2 ROUTERS:

            +----> .Connect() --> ROUTER 1
           /
DEALER ---+------> .Connect() --> ROUTER 2

In my test, I send 10 mesages through the DEALER. I get back a nice even distribution of 5 messages to each of the ROUTER-s.

My problem is, if ROUTER 1 goes away for some reason, the DEALER will still continue to queue messages for it, I think in the assumption that ROUTER 1 will eventually come back. I end up with only 5 messages on ROUTER 2.

What I need to happen is for DEALER to ignore disconnected or failed peers. Is this possible?

I've tried setting ZMQ_SNDHWM and many others, but nothing seems to work.

The only alternative I can see is to do the failover myself, with separate sockets, heartbeats and ACK packets etc. It seems like such a basic pattern should already be implemented by ZeroMQ.


Edit: testing code

package main

import (
    "github.com/pebbe/zmq4"
    "time"
    "log"
    "fmt"
)

func receiveAll(sok *zmq4.Socket) (received int) {
    poller := zmq4.NewPoller()
    poller.Add(sok, zmq4.POLLIN)

    for {
        sockets, err := poller.Poll(100 * time.Millisecond)
        if err != nil {
            log.Print(err)
        }
        if len(sockets) > 0 {
            for _, s := range sockets {
                msg, _ := s.Socket.RecvMessageBytes(0)
                if string(msg[1]) != "Hello World" {
                    log.Fatalf("Unexpected message: %s
", msg)
                }
                received ++
            }
        } else {
            return
        }
    }
}

func main() {

    dealer, _ := zmq4.NewSocket(zmq4.DEALER)
    router1, _ := zmq4.NewSocket(zmq4.ROUTER)
    router2, _ := zmq4.NewSocket(zmq4.ROUTER)

    router1.Bind("tcp://0.0.0.0:6667")
    router2.Bind("tcp://0.0.0.0:6668")

    dealer.Connect("tcp://0.0.0.0:6667")
    dealer.Connect("tcp://0.0.0.0:6668")

    router1.SetSubscribe("")
    router2.SetSubscribe("")
    dealer.SetSubscribe("")

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count1 := receiveAll(router1)
    count2 := receiveAll(router2)

    fmt.Printf("Blue sky scenario: count1=%d count2=%d
", count1, count2)

    // Shut down a peer
    router1.Close()
    time.Sleep(300 * time.Millisecond)

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count := receiveAll(router2)

    fmt.Printf("Peer 1 offline: count=%d
", count)

}
  • 写回答

1条回答 默认 最新

  • dongxin5429 2018-05-31 16:28
    关注

    What I need to happen is for DEALER to ignore disconnected or failed peers. Is this possible ?

    Oh sure, it is. There is need to tweak the default ( inactive ) values, using you use-case specific settings in :

    • a .setsockopt( ZMQ.IMMEDIATE, 1 ) for not buffering message-instances for peer, that do not seem to be "alive"
    • a .setsockopt( ZMQ.HEARTBEAT_IVL, <ms> ) for sending heartbeats
    • a .setsockopt( ZMQ.HEARTBEAT_TTL, <ms> ) for a Time-To-Live setting
    • a .setsockopt( ZMQ.HEARTBEAT_TIMEOUT, <ms>) for a timeout threshold
    • a .setsockopt( ZMQ.HANDSHAKE_IVL, <ms> ) for managing (re-)establishment timeouts.

    For details, check your language binding and what native API version it actually uses under the hood. Most of these settings are available since native-API v 3.x, the most recent native-API v 4.2.2 documentation will help you tune the values and configuration strategies.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 求解答一道线性规划题,用lingo编程运行,第一问要求写出数学模型和lingo语言编程模型,第二问第三问解答就行,我的ddl要到了谁来求了
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名
  • ¥15 maple软件,用solve求反函数出现rootof,怎么办?
  • ¥65 汇编语言除法溢出问题
  • ¥15 Visual Studio问题
  • ¥20 求一个html代码,有偿
  • ¥100 关于使用MATLAB中copularnd函数的问题
  • ¥20 在虚拟机的pycharm上
  • ¥15 jupyterthemes 设置完毕后没有效果