doonbfez815298 2018-12-29 12:10
浏览 441
已采纳

ZMQ无法接收来自多个发布者的消息

I'm implementing the Espresso Pattern of ZMQ.

I want to connect many subscribers <> Proxy <> many publishers

However, the listener in the proxy only receives messages from one publisher. Hence, the subscribers only receive from that particular publisher. I can't figure out what's the problem with my code.

package playground

import (
    zmq "github.com/pebbe/zmq4"

    "fmt"
    "math/rand"
    "time"
    "testing"
)

func subscriber_thread(id int) {
    subscriber, _ := zmq.NewSocket(zmq.SUB)
    subscriber.Connect("tcp://localhost:6001")
    subscriber.SetSubscribe("")
    defer subscriber.Close()

    for {
        msg, err := subscriber.RecvMessage(0)
        if err != nil {
            panic(err)
        }
        fmt.Println("subscriber id:", id,"received:", msg)
    }
}

func publisher_thread(n int) {
    publisher, _ := zmq.NewSocket(zmq.PUB)
    publisher.Bind("tcp://*:6000")

    for {
        s := fmt.Sprintf("%c-%05d", n +'A', rand.Intn(100000))
        _, err := publisher.SendMessage(s)
        if err != nil {
            panic(err)
        }
        fmt.Println("publisher sent:", s)
        time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second
    }
}

//  The listener receives all messages flowing through the proxy, on its
//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
//  attached child threads. In other languages your mileage may vary:

func listener_thread() {
    pipe, _ := zmq.NewSocket(zmq.PAIR)
    pipe.Bind("inproc://pipe")

    //  Print everything that arrives on pipe
    for {
        msg, err := pipe.RecvMessage(0)
        if err != nil {
            break //  Interrupted
        }
        fmt.Printf("%q
", msg)
    }
}

func TestZmqEspresso(t *testing.T) {
    go publisher_thread(0)
    go publisher_thread(1)
    go publisher_thread(2)

    go subscriber_thread(1)
    go subscriber_thread(2)

    go listener_thread()

    time.Sleep(100 * time.Millisecond)

    subscriber, _ := zmq.NewSocket(zmq.XSUB)
    subscriber.Connect("tcp://localhost:6000")

    publisher, _ := zmq.NewSocket(zmq.XPUB)
    publisher.Bind("tcp://*:6001")

    listener, _ := zmq.NewSocket(zmq.PAIR)
    listener.Connect("inproc://pipe")

    zmq.Proxy(subscriber, publisher, listener)

    fmt.Println("interrupted")

}

展开全部

  • 写回答

1条回答 默认 最新

  • douyan1244 2018-12-30 06:46
    关注

    I've figured out the solution. XPUB/XSUB should bind to the socket PUB and SUB workers should connect to socket

    Working code below

    package playground
    
    import (
        zmq "github.com/pebbe/zmq4"
    
        "fmt"
        "log"
        "math/rand"
        "testing"
        "time"
    )
    
    func subscriber_thread(id int) {
        subscriber, err := zmq.NewSocket(zmq.SUB)
        if err != nil {
            panic(err)
        }
        err = subscriber.Connect("tcp://localhost:6001")
        if err != nil {
            panic(err)
        }
        err = subscriber.SetSubscribe("")
        if err != nil {
            panic(err)
        }
        defer subscriber.Close()
    
        for {
            msg, err := subscriber.RecvMessage(0)
            if err != nil {
                panic(err)
            }
            fmt.Println("subscriber id:", id, "received:", msg)
        }
    }
    
    func publisher_thread(n int) {
        publisher, err := zmq.NewSocket(zmq.PUB)
        if err != nil {
            panic(err)
        }
        //err = publisher.Bind("tcp://*:6000")
        err = publisher.Connect("tcp://localhost:6000")
        if err != nil {
            panic(err)
        }
    
        for {
            s := fmt.Sprintf("%c-%05d", n+'A', rand.Intn(100000))
            _, err := publisher.SendMessage(s)
            if err != nil {
                panic(err)
            }
            fmt.Println("publisher sent:", s)
            time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second
        }
    }
    
    //  The listener receives all messages flowing through the proxy, on its
    //  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
    //  attached child threads. In other languages your mileage may vary:
    
    func listener_thread() {
        pipe, _ := zmq.NewSocket(zmq.PAIR)
        pipe.Bind("inproc://pipe")
    
        //  Print everything that arrives on pipe
        for {
            msg, err := pipe.RecvMessage(0)
            if err != nil {
                break //  Interrupted
            }
            fmt.Printf("%q
    ", msg)
        }
    }
    
    func TestZmqEspresso(t *testing.T) {
        log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)
    
        go publisher_thread(0)
        go publisher_thread(1)
        go publisher_thread(2)
    
        go subscriber_thread(1)
        go subscriber_thread(2)
    
        go listener_thread()
    
        time.Sleep(100 * time.Millisecond)
    
        subscriber, err := zmq.NewSocket(zmq.XSUB)
        if err != nil {
            panic(err)
        }
        //err = subscriber.Connect("tcp://localhost:6000")
        err = subscriber.Bind("tcp://*:6000")
        if err != nil {
            panic(err)
        }
    
        publisher, err := zmq.NewSocket(zmq.XPUB)
        if err != nil {
            panic(err)
        }
        err = publisher.Bind("tcp://*:6001")
        if err != nil {
            panic(err)
        }
    
        listener, _ := zmq.NewSocket(zmq.PAIR)
        listener.Connect("inproc://pipe")
    
        err = zmq.Proxy(subscriber, publisher, listener)
        if err != nil {
            panic(err)
        }
    
        fmt.Println("interrupted")
    
    }
    

    展开全部

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
编辑
预览

报告相同问题?

悬赏问题

  • ¥15 距离软磁铁一定距离的磁感应强度大小怎么求
  • ¥15 霍尔传感器hmc5883l的xyz轴输出和该点的磁感应强度大小的关系是什么
  • ¥15 vscode开发micropython,import模块出现异常
  • ¥20 Excel数据自动录入表单并提交
  • ¥30 silcavo仿真,30分钟,只需要代码
  • ¥15 FastReport 怎么实现打印后马上关闭打印预览窗口
  • ¥15 利用3支股票数据估计其均值和方差的95%置信区间。
  • ¥15 微信小程序运行一项功能时,弹出未知错误弹框,检查代码没有问题
  • ¥15 ATAC测序生成self-pseudo replicates之前是否要进行去线粒体reads
  • ¥15 python模糊字匹配函数问题
手机看
程序员都在用的中文IT技术交流社区

程序员都在用的中文IT技术交流社区

专业的中文 IT 技术社区,与千万技术人共成长

专业的中文 IT 技术社区,与千万技术人共成长

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

客服 返回
顶部