douxishai8552 2016-04-05 21:57
浏览 491
已采纳

ZMQ套接字如何处理缺乏线程安全性的问题?

I've been using ZMQ in some Python applications for a while, but only very recently I decided to reimplement one of them in Go and I realized that ZMQ sockets are not thread-safe.

The original Python implementation uses an event loop that looks like this:

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))

    for req in requests:
        rep = handle_request(req)
        if rep:
            replies.append(rep)
            requests.remove(req)

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        router.send(data)
        del replies[:]

The problem is that the reply might not be ready on the first pass, so whenever I have pending requests, I have to poll with a very short timeout or the clients will wait for more than they should, and the application ends up using a lot of CPU for polling.

When I decided to reimplement it in Go, I thought it would be as simple as this, avoiding the problem by using infinite timeout on polling:

for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                
        }
    }
}

But that ideal implementation only works when I have a single client connected, or a light load. Under heavy load I get random assertion errors inside libzmq. I tried the following:

  1. Following the zmq4 docs I tried adding a sync.Mutex and lock/unlock on all socket operations. It fails. I assume it's because ZMQ uses its own threads for flushing.

  2. Creating one goroutine for polling/receiving and one for sending, and use channels in the same way I used the req/rep queues in the Python version. It fails, as I'm still sharing the socket.

  3. Same as 2, but setting GOMAXPROCS=1. It fails, and throughput was very limited because replies were being held back until the Poll() call returned.

  4. Use the req/rep channels as in 2, but use runtime.LockOSThread to keep all socket operations in the same thread as the socket. Has the same problem as above. It doesn't fail, but throughput was very limited.

  5. Same as 4, but using the poll timeout strategy from the Python version. It works, but has the same problem the Python version does.

  6. Share the context instead of the socket and create one socket for sending and one for receiving in separate goroutines, communicating with channels. It works, but I'll have to rewrite the client libs to use two sockets instead of one.

  7. Get rid of zmq and use raw TCP sockets, which are thread-safe. It works perfectly, but I'll also have to rewrite the client libs.

So, it looks like 6 is how ZMQ was really intended to be used, as that's the only way I got it to work seamlessly with goroutines, but I wonder if there's any other way I haven't tried. Any ideas?


Update

With the answers here I realized I can just add an inproc PULL socket to the poller and have a goroutine connect and push a byte to break out of the infinite wait. It's not as versatile as the solutions suggested here, but it works and I can even backport it to the Python version.

  • 写回答

2条回答 默认 最新

  • duanpin5168 2016-04-05 22:35
    关注

    I opened an issue a 1.5 years ago to introduce a port of https://github.com/vaughan0/go-zmq/blob/master/channels.go to pebbe/zmq4. Ultimately the author decided against it, but we have used this in production (under VERY heavy workloads) for a long time now.

    This is a gist of the file that had to be added to the pebbe/zmq4 package (since it adds methods to the Socket). This could be re-written in such a way that the methods on the Socket receiver instead took a Socket as an argument, but since we vendor our code anyway, this was an easy way forward.

    The basic usage is to create your Socket like normal (call it s for example) then you can:

    channels := s.Channels()
    outBound := channels.Out()
    inBound := channels.In()
    

    Now you have two channels of type [][]byte that you can use between goroutines, but a single goroutine - managed within the channels abstraction, is responsible for managing the Poller and communicating with the socket.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 在若依框架下实现人脸识别
  • ¥15 网络科学导论,网络控制
  • ¥100 安卓tv程序连接SQLSERVER2008问题
  • ¥15 利用Sentinel-2和Landsat8做一个水库的长时序NDVI的对比,为什么Snetinel-2计算的结果最小值特别小,而Lansat8就很平均
  • ¥15 metadata提取的PDF元数据,如何转换为一个Excel
  • ¥15 关于arduino编程toCharArray()函数的使用
  • ¥100 vc++混合CEF采用CLR方式编译报错
  • ¥15 coze 的插件输入飞书多维表格 app_token 后一直显示错误,如何解决?
  • ¥15 vite+vue3+plyr播放本地public文件夹下视频无法加载
  • ¥15 c#逐行读取txt文本,但是每一行里面数据之间空格数量不同