doudeng2184 2017-12-14 15:45
浏览 320
已采纳

ZeroMQ中的PUB / SUB模式不起作用

I am trying to implement a very basic PUB/SUB pattern using ZeroMQ. I would like to have a server (always active) broadcasting messages (publisher) to all clients and does not care about connected clients. If a clients connect to this server as a subscriber, it should receive the message.

However, I can not send the message using PUB/SUB.

In Python it would be:

# publisher (server.py)
import zmq

ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:9091')

while True:
    publisher.send_string("test")

and

# subscriber (client.py)
import zmq

ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.connect('tcp://127.0.0.1:9091')

while True:
    msg = subscriber.recv_string()
    print msg

Or in golang:

package main

import (
    "github.com/pebbe/zmq4"
    "log"
    "time"
)

func Listen(subscriber *zmq4.Socket) {
    for {
        s, err := subscriber.Recv(0)
        if err != nil {
            log.Println(err)
            continue
        }
        log.Println("rec", s)
    }
}

func main() {
    publisher, _ := zmq4.NewSocket(zmq4.PUB)
    defer publisher.Close()
    publisher.Bind("tcp://*:9090")

    subscriber, _ := zmq4.NewSocket(zmq4.SUB)
    defer subscriber.Close()
    subscriber.Connect("tcp://127.0.0.1:9090")

    go Listen(subscriber)
    for _ = range time.Tick(time.Second) {
        publisher.Send("test", 0)
        log.Println("send", "test")

    }
}

Did I mis-understood this pattern or do I need to send a particular signal from the client to the server, when connecting. I am interested in the golang version and only use the python version for testing.

  • 写回答

1条回答 默认 最新

  • doufeng3602 2017-12-14 15:52
    关注

    Did I mis-understood this pattern? Yes, fortunately you did.

    ZeroMQ archetypes were defined so as to represent a certain behaviour. As said, PUSH-archetype AccessPoint pushes every message "through" all the so far setup communication channels, PULL-er AccessPoint pulls anything that has arrived down the line(s) to "it's hands", PUB-lisher AccessPoint publishes, SUB-scriber AccessPoint subscribes, so as to receive just the messages, that match it's topic-filter(s), but not any other.

    As it seems clear, such Archetype "specification" helps build the ZeroMQ smart messaging / signalling infrastructure for our ease of use in distributed-systems architectures.


    # subscriber (client.py)
    import zmq
    
    ctx        = zmq.Context()
    subscriber = ctx.socket( zmq.SUB )
    subscriber.connect( 'tcp://127.0.0.1:9091' )
    subscriber.setsockopt( zmq.LINGER,    0 )         # ALWAYS:
    subscriber.setsockopt( zmq.SUBSCRIBE, "" )        # OTHERWISE NOTHING DELIVERED
    
    while True:
        msg = subscriber.recv_string()                # MAY USE .poll() + zmq.NOBLOCK
        print msg
    

    subscriber, _ := zmq4.NewSocket( zmq4.SUB )
    subscriber.Connect(             "tcp://127.0.0.1:9090" )
    subscriber.SetSubscribe(         filter )                 // SET: <topic-filter>
    
    subscriber.SetLinger(            0 )    //  SAFETY FIRST: PREVENT DEADLOCK
    defer subscriber.Close()                //  NOW MAY SAFELY SET:
    
    ...
    msg, _ := subscriber.Recv( 0 )
    

    As defined, any right instantiated SUB-side AccessPoint object has literally zero-chance to know, what will be the choice of what messages are those right ones, so that they ought be "delivered" and what messages are not.

    Without this initial piece of knowledge, ZeroMQ designers had a principal choice to be either Archetype-policy consistent and let PUB-side AccessNode to distribute all the .send()-acquired messages only to those SUB-side AccessNode(s), that have explicitly requested to receive any such, right via the zmq.SUBSCRIBE-mechanics or to deliver everything sent from PUB also to all so far un-decided SUB-s.

    The former was a consistent and professional design step from ZeroMQ authors.
    The latter would actually mean to violate ZeroMQ own RFC-specification.

    The latter choice would be something like if one has just moved to a new apartment, one would hardly expect to find all newspapers and magazines to appear delivered in one's new mailbox from next morning on, would one? But if one subscribes to Boston Globe, the very next morning the fresh release will be at the doorstep as it will remain to be there, until one cancels the subscription or the newspaper went bankrupt or a lack of paper rolls prevented the printing shop from delivering in due time and fashion or a traffic jam in the Big Dig tunnel might have caused troubles for all or just the local delivery some one particular day.

    All this is natural and compatible with the Archetype-policy.

    Intermezzo: Golang has already bindings to many different API versions
    Technology purists will object here, that early API releases ( till some v3.2+ ) actually did technically transport all message-payloads from a PUB to all SUB-s, as it simplified PUB-side workload envelope, but increased transport-class(es) data-flow and SUB-side resources / deferred topic-filter processing. Yet all this was hidden from user-code, right by the API horizon of abstraction. So, except of a need to properly scale resources, this was transparent to user. More recent API versions reverted the role of topic-filter processor and let this to now happen on the PUB-side. Nevertheless, in both cases, the ZeroMQ RFC specification policy is implemented in such a manner, the SUB-side will never deliver ( through the .recv()-interface ) a single message, that was not matching the valid, explicit SUB-side subscription(s)

    In all cases a SUB-side has not yet explicitly set any zmq.SUBSCRIBE-instructed topic-filter, it cannot and will not deliver anything ( which is both natural and fully-consistent with the ZeroMQ RFC Archetype-policy defined for the SUB-type AccessPoint ).

    The Best Next Step:

    Always, at least, read the ZeroMQ API documentation, where all details are professionally specified - so at least, one can get a first glimpse on the breath of the smart messaging / signaling framework.

    This will not help anyone to start on a green-field and fully-build one's own complex mental-concept and indepth understanding of how all the things work internally, which is obviously not any API-documentation's ambition, is it? Yet, this will help anyone to refresh or remind about all configurable details, once one has mastered the ZeroMQ internal architecture, as detailed in the source, referred in the next paragraph.

    Plus, for anyone, who is indeed interested in distributed-systems or just zeromq per-se, it is worth one's time and efforts to always read Pieter HINTJENS' book "Code Connected, Volume 1" ( freely available in pdf ) plus any other of his books on his rich experience on software engineering later, because his many insights into modern computing may and will inspire ( and lot ).

    edit:

    MWE in GO

    package main
    
    import (
        "github.com/pebbe/zmq4"
        "log"
        "time"
    )
    
    func Listen(subscriber *zmq4.Socket) {
        for {
            s, err := subscriber.Recv(0)
            if err != nil {
                log.Println(err)
                continue
            }
            log.Println("rec", s)
        }
    }
    
    func main() {
        publisher, _ := zmq4.NewSocket(zmq4.PUB)
        publisher.SetLinger(0)
        defer publisher.Close()
    
        publisher.Bind("tcp://127.0.0.1:9092")
    
        subscriber, _ := zmq4.NewSocket(zmq4.SUB)
        subscriber.SetLinger(0)
        defer subscriber.Close()
    
        subscriber.Connect("tcp://127.0.0.1:9092")
        subscriber.SetSubscribe("")
    
        go Listen(subscriber)
        for _ = range time.Tick(time.Second) {
            publisher.Send("test", 0)
            log.Println("send", "test")
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 Arcgis相交分析无法绘制一个或多个图形
  • ¥15 seatunnel-web使用SQL组件时候后台报错,无法找到表格
  • ¥15 fpga自动售货机数码管(相关搜索:数字时钟)
  • ¥15 用前端向数据库插入数据,通过debug发现数据能走到后端,但是放行之后就会提示错误
  • ¥30 3天&7天&&15天&销量如何统计同一行
  • ¥30 帮我写一段可以读取LD2450数据并计算距离的Arduino代码
  • ¥15 飞机曲面部件如机翼,壁板等具体的孔位模型
  • ¥15 vs2019中数据导出问题
  • ¥20 云服务Linux系统TCP-MSS值修改?
  • ¥20 关于#单片机#的问题:项目:使用模拟iic与ov2640通讯环境:F407问题:读取的ID号总是0xff,自己调了调发现在读从机数据时,SDA线上并未有信号变化(语言-c语言)