I've been using ZMQ in some Python applications for a while, but only very recently I decided to reimplement one of them in Go and I realized that ZMQ sockets are not thread-safe.

The original Python implementation uses an event loop that looks like this:

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))

    for req in requests:
        rep = handle_request(req)
        if rep:

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        del replies[:]

The problem is that the reply might not be ready on the first pass, so whenever I have pending requests, I have to poll with a very short timeout or the clients will wait for more than they should, and the application ends up using a lot of CPU for polling.

When I decided to reimplement it in Go, I thought it would be as simple as this, avoiding the problem by using infinite timeout on polling:

for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                

But that ideal implementation only works when I have a single client connected, or a light load. Under heavy load I get random assertion errors inside libzmq. I tried the following:

  1. Following the zmq4 docs I tried adding a sync.Mutex and lock/unlock on all socket operations. It fails. I assume it's because ZMQ uses its own threads for flushing.

  2. Creating one goroutine for polling/receiving and one for sending, and use channels in the same way I used the req/rep queues in the Python version. It fails, as I'm still sharing the socket.

  3. Same as 2, but setting GOMAXPROCS=1. It fails, and throughput was very limited because replies were being held back until the Poll() call returned.

  4. Use the req/rep channels as in 2, but use runtime.LockOSThread to keep all socket operations in the same thread as the socket. Has the same problem as above. It doesn't fail, but throughput was very limited.

  5. Same as 4, but using the poll timeout strategy from the Python version. It works, but has the same problem the Python version does.

  6. Share the context instead of the socket and create one socket for sending and one for receiving in separate goroutines, communicating with channels. It works, but I'll have to rewrite the client libs to use two sockets instead of one.

  7. Get rid of zmq and use raw TCP sockets, which are thread-safe. It works perfectly, but I'll also have to rewrite the client libs.

So, it looks like 6 is how ZMQ was really intended to be used, as that's the only way I got it to work seamlessly with goroutines, but I wonder if there's any other way I haven't tried. Any ideas?


With the answers here I realized I can just add an inproc PULL socket to the poller and have a goroutine connect and push a byte to break out of the infinite wait. It's not as versatile as the solutions suggested here, but it works and I can even backport it to the Python version.

dsfd3546 因为我不能将通道与ZMQ轮询器一起使用。如果可以在Go'sselect中使用ZMQ套接字,或者在ZMQpoller中使用通道,那将是很棒的。
4 年多之前 回复
dtv11049 我对Go世界是陌生的-除了一致性之外,还有其他原因需要使用inproc套接字,而不是使用ZMQ进行外部通信而使用Go通道进行内部通信的混合系统吗?
4 年多之前 回复
4 年多之前 回复
douhanzhen8927 您在Go端使用什么ZMQ库?
4 年多之前 回复


在1.5年前打开了一个问题,引入了 https的端口 :// 到pebbe / zmq4。 最终,作者决定对此表示反对,但是很长一段时间以来,我们已经在生产环境中使用了它。</ p>

这是要点必须添加到pebbe / zmq4包中(因为它向Socket添加了方法)。 可以这样重写它,即Socket接收器上的方法改为使用 Socket </ code>作为参数,但是由于无论如何我们都提供了代码,因此这是一种简单的方法。</ p >

基本用法是像平常一样创建您的 Socket </ code>(例如,将其称为 s </ code>),然后您可以:</ p>
\ n

  channels:= s.Channels()
outBound:= channels.Out()
inBound:= channels.In()
</ code> </ pre>

现在,您可以在goroutine之间使用两个类型为 [] [] byte </ code>的通道,但是在通道抽象中管理的单个goroutine负责管理 Poller </ code> 并与套接字通信。</ p>
</ div>



I opened an issue a 1.5 years ago to introduce a port of to pebbe/zmq4. Ultimately the author decided against it, but we have used this in production (under VERY heavy workloads) for a long time now.

This is a gist of the file that had to be added to the pebbe/zmq4 package (since it adds methods to the Socket). This could be re-written in such a way that the methods on the Socket receiver instead took a Socket as an argument, but since we vendor our code anyway, this was an easy way forward.

The basic usage is to create your Socket like normal (call it s for example) then you can:

channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()

Now you have two channels of type [][]byte that you can use between goroutines, but a single goroutine - managed within the channels abstraction, is responsible for managing the Poller and communicating with the socket.

douba8758 忘记我以前的评论。 我知道了 本质上,它的作用是使用inproc套接字打破无限超时,并将所有内容包装在通道中以使其更加无缝。 好东西。 谢谢。
4 年多之前 回复

使用pebbe / zmq4做到这一点的一种有福的方法是使用反应器。 反应堆具有在Go频道上侦听的能力,但是您不想这样做</ em>,因为它们通过使用轮询超时定期轮询信道来这样做,从而重新引入了您遇到的相同问题 Python版本。 相反,您可以使用zmq inproc </ code>套接字,其一端由反应堆保持,另一端由从通道传入数据的goroutine保持。 它很复杂,冗长且令人不愉快,但我已经成功使用了它。</ p>
</ div>



The blessed way to do this with pebbe/zmq4 is with a Reactor. Reactors have the ability to listen on Go channels, but you don't want to do that because they do so by polling the channel periodically using a poll timeout, which reintroduces the same exact problem you have in your Python version. Instead you can use zmq inproc sockets, with one end held by the reactor and the other end held by a goroutine that passes data in from a channel. It's complicated, verbose, and unpleasant, but I have used it successfully.

duanhao9176 聪明。 很高兴我可以以某种方式提供帮助。
4 年多之前 回复
dongxiao9583 我知道了,但我只是意识到我可以简单地使用一个inproc套接字来中断poller.Poll(-1)调用,我什至不必使用反应堆。 谢谢。
4 年多之前 回复
Csdn user default icon