I'm using ZeroMQ's multiple connect feature, to connect a single DEALER
to 2 ROUTERS
:
+----> .Connect() --> ROUTER 1
/
DEALER ---+------> .Connect() --> ROUTER 2
In my test, I send 10 mesages through the DEALER
. I get back a nice even distribution of 5 messages to each of the ROUTER
-s.
My problem is, if ROUTER 1
goes away for some reason, the DEALER
will still continue to queue messages for it, I think in the assumption that ROUTER 1
will eventually come back. I end up with only 5 messages on ROUTER 2
.
What I need to happen is for DEALER
to ignore disconnected or failed peers. Is this possible?
I've tried setting ZMQ_SNDHWM
and many others, but nothing seems to work.
The only alternative I can see is to do the failover myself, with separate sockets, heartbeats and ACK packets etc. It seems like such a basic pattern should already be implemented by ZeroMQ.
Edit: testing code
package main
import (
"github.com/pebbe/zmq4"
"time"
"log"
"fmt"
)
func receiveAll(sok *zmq4.Socket) (received int) {
poller := zmq4.NewPoller()
poller.Add(sok, zmq4.POLLIN)
for {
sockets, err := poller.Poll(100 * time.Millisecond)
if err != nil {
log.Print(err)
}
if len(sockets) > 0 {
for _, s := range sockets {
msg, _ := s.Socket.RecvMessageBytes(0)
if string(msg[1]) != "Hello World" {
log.Fatalf("Unexpected message: %s
", msg)
}
received ++
}
} else {
return
}
}
}
func main() {
dealer, _ := zmq4.NewSocket(zmq4.DEALER)
router1, _ := zmq4.NewSocket(zmq4.ROUTER)
router2, _ := zmq4.NewSocket(zmq4.ROUTER)
router1.Bind("tcp://0.0.0.0:6667")
router2.Bind("tcp://0.0.0.0:6668")
dealer.Connect("tcp://0.0.0.0:6667")
dealer.Connect("tcp://0.0.0.0:6668")
router1.SetSubscribe("")
router2.SetSubscribe("")
dealer.SetSubscribe("")
for i := 0; i < 10; i++ {
dealer.SendBytes([]byte("Hello World"), 0)
}
time.Sleep(300 * time.Millisecond)
count1 := receiveAll(router1)
count2 := receiveAll(router2)
fmt.Printf("Blue sky scenario: count1=%d count2=%d
", count1, count2)
// Shut down a peer
router1.Close()
time.Sleep(300 * time.Millisecond)
for i := 0; i < 10; i++ {
dealer.SendBytes([]byte("Hello World"), 0)
}
time.Sleep(300 * time.Millisecond)
count := receiveAll(router2)
fmt.Printf("Peer 1 offline: count=%d
", count)
}