On Python, the ZeroMQ .recv()/.send()
operations are blocking, which is just perfect for REQ/REP
.
In Golang, I must pass a zmq.DONTWAIT
to the .recv()
and .send()
operation in order to make it work.
But the thing is, the flow needs to be lock step, so:
server.recv()
client.send()
client.recv()
server.send()
And between 3 and 4 the weirdness starts, because they are async.
When the client has sent a message and the server has not received it yet but client tries to receive a response, the lock step is no lock step any more.
Is there some kind of zmq.DOBLOCK
in contrast to zmq.DONTWAIT
?
Or did I get something wrong here?
EDIT:
I am using this go binding in C for zeromq: https://godoc.org/github.com/pebbe/zmq4#Type
As you can see here the .recv()
needs a input flag
, which is one of the both on the second ref:
Recv: https://godoc.org/github.com/pebbe/zmq4#Socket.Recv
Flags to be passed: https://github.com/pebbe/zmq4/blob/master/zmq4.go#L403
This is the current code I got to make a workaround which feels somewhat ugly:
package connection
import (
"zmq4"
"fmt"
"time"
)
const ERRTMPUNAV="resource temporarily unavailable"
func checkError(e error){
if e != nil {
panic(e)
}
}
func CreateRepNode(address string,onMessage chan<- string,send <-chan string,closeConn <-chan bool){
stop:=false
socket,err:=zmq4.NewSocket(zmq4.REP)
checkError(err)
err=socket.Bind(address)
checkError(err)
go func(socket *zmq4.Socket){
for {
msg,err:=socket.Recv(zmq4.DONTWAIT)
fmt.Println("server message"+msg)
if stop==true {
return
}
if err != nil {
rateLimit := time.Tick(100 * time.Millisecond)
<-rateLimit
continue
}
checkError(err)
onMessage<-msg
rep:=<-send
_,err=socket.Send(rep,zmq4.DONTWAIT)
}
}(socket)
<-closeConn
stop=true
}
func CreateReqNode(address string,onMessage chan<- string,send <-chan string,closeConn <-chan bool){
stop:=false
socket,err:=zmq4.NewSocket(zmq4.REQ)
checkError(err)
err=socket.Connect(address)
checkError(err)
go func(){
for {
msg:=<-send
if stop==true {
return
}
_,err:=socket.Send(msg,zmq4.DONTWAIT)
for {
msg,err=socket.Recv(zmq4.DONTWAIT)
fmt.Println("client got message "+msg)
if err!=nil {
if err.Error()==ERRTMPUNAV {
w:=time.Tick(100*time.Millisecond)
<-w
continue
}
}
break
}
onMessage<-msg
}
}()
<-closeConn
stop=true
}