douxishai8552 2016-04-05 21:57
浏览 477


I've been using ZMQ in some Python applications for a while, but only very recently I decided to reimplement one of them in Go and I realized that ZMQ sockets are not thread-safe.

The original Python implementation uses an event loop that looks like this:

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))

    for req in requests:
        rep = handle_request(req)
        if rep:

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        del replies[:]

The problem is that the reply might not be ready on the first pass, so whenever I have pending requests, I have to poll with a very short timeout or the clients will wait for more than they should, and the application ends up using a lot of CPU for polling.

When I decided to reimplement it in Go, I thought it would be as simple as this, avoiding the problem by using infinite timeout on polling:

for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                

But that ideal implementation only works when I have a single client connected, or a light load. Under heavy load I get random assertion errors inside libzmq. I tried the following:

  1. Following the zmq4 docs I tried adding a sync.Mutex and lock/unlock on all socket operations. It fails. I assume it's because ZMQ uses its own threads for flushing.

  2. Creating one goroutine for polling/receiving and one for sending, and use channels in the same way I used the req/rep queues in the Python version. It fails, as I'm still sharing the socket.

  3. Same as 2, but setting GOMAXPROCS=1. It fails, and throughput was very limited because replies were being held back until the Poll() call returned.

  4. Use the req/rep channels as in 2, but use runtime.LockOSThread to keep all socket operations in the same thread as the socket. Has the same problem as above. It doesn't fail, but throughput was very limited.

  5. Same as 4, but using the poll timeout strategy from the Python version. It works, but has the same problem the Python version does.

  6. Share the context instead of the socket and create one socket for sending and one for receiving in separate goroutines, communicating with channels. It works, but I'll have to rewrite the client libs to use two sockets instead of one.

  7. Get rid of zmq and use raw TCP sockets, which are thread-safe. It works perfectly, but I'll also have to rewrite the client libs.

So, it looks like 6 is how ZMQ was really intended to be used, as that's the only way I got it to work seamlessly with goroutines, but I wonder if there's any other way I haven't tried. Any ideas?


With the answers here I realized I can just add an inproc PULL socket to the poller and have a goroutine connect and push a byte to break out of the infinite wait. It's not as versatile as the solutions suggested here, but it works and I can even backport it to the Python version.

  • 写回答



      相关推荐 更多相似问题


      • ¥40 python,计算机程序运行结果很奇怪
      • ¥15 有关时间计算器的问题
      • ¥200 Chrome浏览器自动保存密码需要解密!来人
      • ¥18 关于#c语言#的问题:如何把下面几个代码放在一起编序号,然后输入对应数字运行对应代码,还可以返回重输
      • ¥30 windows下无法运行go test的问题
      • ¥15 多个Rigify骨骼 导入一起 后 没有 控制器面板
      • ¥15 想问一下sprak ada语言的pre和post怎么设
      • ¥15 使用python将分子模拟输出.gro文件
      • ¥15 港诡实录的背包系统拆解
      • ¥15 宽带IP网络路由器的初始路由表和最终路由表