doudeng2184 2017-12-14 15:45
浏览 320
已采纳

ZeroMQ中的PUB / SUB模式不起作用

I am trying to implement a very basic PUB/SUB pattern using ZeroMQ. I would like to have a server (always active) broadcasting messages (publisher) to all clients and does not care about connected clients. If a clients connect to this server as a subscriber, it should receive the message.

However, I can not send the message using PUB/SUB.

In Python it would be:

# publisher (server.py)
import zmq

ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:9091')

while True:
    publisher.send_string("test")

and

# subscriber (client.py)
import zmq

ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.connect('tcp://127.0.0.1:9091')

while True:
    msg = subscriber.recv_string()
    print msg

Or in golang:

package main

import (
    "github.com/pebbe/zmq4"
    "log"
    "time"
)

func Listen(subscriber *zmq4.Socket) {
    for {
        s, err := subscriber.Recv(0)
        if err != nil {
            log.Println(err)
            continue
        }
        log.Println("rec", s)
    }
}

func main() {
    publisher, _ := zmq4.NewSocket(zmq4.PUB)
    defer publisher.Close()
    publisher.Bind("tcp://*:9090")

    subscriber, _ := zmq4.NewSocket(zmq4.SUB)
    defer subscriber.Close()
    subscriber.Connect("tcp://127.0.0.1:9090")

    go Listen(subscriber)
    for _ = range time.Tick(time.Second) {
        publisher.Send("test", 0)
        log.Println("send", "test")

    }
}

Did I mis-understood this pattern or do I need to send a particular signal from the client to the server, when connecting. I am interested in the golang version and only use the python version for testing.

  • 写回答

1条回答 默认 最新

  • doufeng3602 2017-12-14 15:52
    关注

    Did I mis-understood this pattern? Yes, fortunately you did.

    ZeroMQ archetypes were defined so as to represent a certain behaviour. As said, PUSH-archetype AccessPoint pushes every message "through" all the so far setup communication channels, PULL-er AccessPoint pulls anything that has arrived down the line(s) to "it's hands", PUB-lisher AccessPoint publishes, SUB-scriber AccessPoint subscribes, so as to receive just the messages, that match it's topic-filter(s), but not any other.

    As it seems clear, such Archetype "specification" helps build the ZeroMQ smart messaging / signalling infrastructure for our ease of use in distributed-systems architectures.


    # subscriber (client.py)
    import zmq
    
    ctx        = zmq.Context()
    subscriber = ctx.socket( zmq.SUB )
    subscriber.connect( 'tcp://127.0.0.1:9091' )
    subscriber.setsockopt( zmq.LINGER,    0 )         # ALWAYS:
    subscriber.setsockopt( zmq.SUBSCRIBE, "" )        # OTHERWISE NOTHING DELIVERED
    
    while True:
        msg = subscriber.recv_string()                # MAY USE .poll() + zmq.NOBLOCK
        print msg
    

    subscriber, _ := zmq4.NewSocket( zmq4.SUB )
    subscriber.Connect(             "tcp://127.0.0.1:9090" )
    subscriber.SetSubscribe(         filter )                 // SET: <topic-filter>
    
    subscriber.SetLinger(            0 )    //  SAFETY FIRST: PREVENT DEADLOCK
    defer subscriber.Close()                //  NOW MAY SAFELY SET:
    
    ...
    msg, _ := subscriber.Recv( 0 )
    

    As defined, any right instantiated SUB-side AccessPoint object has literally zero-chance to know, what will be the choice of what messages are those right ones, so that they ought be "delivered" and what messages are not.

    Without this initial piece of knowledge, ZeroMQ designers had a principal choice to be either Archetype-policy consistent and let PUB-side AccessNode to distribute all the .send()-acquired messages only to those SUB-side AccessNode(s), that have explicitly requested to receive any such, right via the zmq.SUBSCRIBE-mechanics or to deliver everything sent from PUB also to all so far un-decided SUB-s.

    The former was a consistent and professional design step from ZeroMQ authors.
    The latter would actually mean to violate ZeroMQ own RFC-specification.

    The latter choice would be something like if one has just moved to a new apartment, one would hardly expect to find all newspapers and magazines to appear delivered in one's new mailbox from next morning on, would one? But if one subscribes to Boston Globe, the very next morning the fresh release will be at the doorstep as it will remain to be there, until one cancels the subscription or the newspaper went bankrupt or a lack of paper rolls prevented the printing shop from delivering in due time and fashion or a traffic jam in the Big Dig tunnel might have caused troubles for all or just the local delivery some one particular day.

    All this is natural and compatible with the Archetype-policy.

    Intermezzo: Golang has already bindings to many different API versions
    Technology purists will object here, that early API releases ( till some v3.2+ ) actually did technically transport all message-payloads from a PUB to all SUB-s, as it simplified PUB-side workload envelope, but increased transport-class(es) data-flow and SUB-side resources / deferred topic-filter processing. Yet all this was hidden from user-code, right by the API horizon of abstraction. So, except of a need to properly scale resources, this was transparent to user. More recent API versions reverted the role of topic-filter processor and let this to now happen on the PUB-side. Nevertheless, in both cases, the ZeroMQ RFC specification policy is implemented in such a manner, the SUB-side will never deliver ( through the .recv()-interface ) a single message, that was not matching the valid, explicit SUB-side subscription(s)

    In all cases a SUB-side has not yet explicitly set any zmq.SUBSCRIBE-instructed topic-filter, it cannot and will not deliver anything ( which is both natural and fully-consistent with the ZeroMQ RFC Archetype-policy defined for the SUB-type AccessPoint ).

    The Best Next Step:

    Always, at least, read the ZeroMQ API documentation, where all details are professionally specified - so at least, one can get a first glimpse on the breath of the smart messaging / signaling framework.

    This will not help anyone to start on a green-field and fully-build one's own complex mental-concept and indepth understanding of how all the things work internally, which is obviously not any API-documentation's ambition, is it? Yet, this will help anyone to refresh or remind about all configurable details, once one has mastered the ZeroMQ internal architecture, as detailed in the source, referred in the next paragraph.

    Plus, for anyone, who is indeed interested in distributed-systems or just zeromq per-se, it is worth one's time and efforts to always read Pieter HINTJENS' book "Code Connected, Volume 1" ( freely available in pdf ) plus any other of his books on his rich experience on software engineering later, because his many insights into modern computing may and will inspire ( and lot ).

    edit:

    MWE in GO

    package main
    
    import (
        "github.com/pebbe/zmq4"
        "log"
        "time"
    )
    
    func Listen(subscriber *zmq4.Socket) {
        for {
            s, err := subscriber.Recv(0)
            if err != nil {
                log.Println(err)
                continue
            }
            log.Println("rec", s)
        }
    }
    
    func main() {
        publisher, _ := zmq4.NewSocket(zmq4.PUB)
        publisher.SetLinger(0)
        defer publisher.Close()
    
        publisher.Bind("tcp://127.0.0.1:9092")
    
        subscriber, _ := zmq4.NewSocket(zmq4.SUB)
        subscriber.SetLinger(0)
        defer subscriber.Close()
    
        subscriber.Connect("tcp://127.0.0.1:9092")
        subscriber.SetSubscribe("")
    
        go Listen(subscriber)
        for _ = range time.Tick(time.Second) {
            publisher.Send("test", 0)
            log.Println("send", "test")
        }
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 #MATLAB仿真#车辆换道路径规划
  • ¥15 java 操作 elasticsearch 8.1 实现 索引的重建
  • ¥15 数据可视化Python
  • ¥15 要给毕业设计添加扫码登录的功能!!有偿
  • ¥15 kafka 分区副本增加会导致消息丢失或者不可用吗?
  • ¥15 微信公众号自制会员卡没有收款渠道啊
  • ¥100 Jenkins自动化部署—悬赏100元
  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘