林 先森 2018-06-19 02:56 采纳率: 0%
浏览 4120
已结题

RabbitMQ 插件MQTT使用 mqtt需要大量连接设备 for Java

RabbitMQ 插件MQTT使用 mqtt需要大量连接设备 for Java
使用RabbitMQ作为mqtt的服务器,如何管理mqtt创建大量连接进行发布与订阅消息。(云端服务器)设备与云端服务器建立长连接

  • 写回答

2条回答 默认 最新

  • 我姓徐 2018-06-19 07:37
    关注

    rabbitmq中有实现MQTT的插件,具体介绍参考下面的链接

    http://www.rabbitmq.com/mqtt.html

    安装插件

    根据官网的介绍,只要启用rabbitmq的MQTT插件即可:
    
       rabbitmq-plugins enable rabbitmq_mqtt
    
    启用后重启一下rabbitmq-server就可以了
    

    MQTT消息的收发--python实现

    在众多MQTT broker的实现中,mosquitto(蚊子)是很著名的一个。
    
    根据下面链接中的介绍,rabbitmq的mqtt插件,跟mosquitto有互操作性(Interoperability)
    
    http://www.rabbitmq.com/blog/2012/09/12/mqtt-adapter/
    
    
    
    python客户端库的获得
    
        首先,需要下载python的客户端库,从下面这个链接中可以下载到mosquitto
    
            http://mosquitto.org/download/
    
        下载完成后解压,python的库在源码目录下的./lib/python目录下,安装
    
            python setup.py build
    
            python setup.py install
    
    
    
    消息收发实现
    
        上面下载的mosquitto的源码目录./lib/python下,有一个例子实现sub.py
    
        http://mosquitto.org/documentation/python/
    
        上面的链接中有如何用python收发消息的介绍,我再稍微总结一下
    
         要实现消息的接收,总共有下面几步
    
             1. 创建客户端,注册callback函数
    
             2. 订阅消息(设定topic)
    
             3. 循环处理
    
          消息订阅时(函数原型:subscribe(self, topic, qos=0)),会提供要订阅消息的topic,和期望服务质量(https://www.ibm.com/developerworks/cn/webservices/ws-mqtt/index.html,该连接中说明的’至多一次‘,’至少一次‘,‘只有一次’的三种服务质量)。
    
         循环处理中遇到事件时,会调用一开始注册好的callback函数。根据网页的介绍,事件有下面这几种:
    
              Connect callback
    
              Disconnect callback
    
              Publish callback
    
              Message callback
    
              Subscribe callback
    
              Unsubscribe callback
    
          关于上面的Callback介绍,链接中都有比较详细的说明,请参考。
    
          在这个客户端的实现中,loop()方法的调用非常重要,根据介绍中说明的,loop方法的作用是处理来来往往(incoming and outgoing)的网络数据,并且需要被频繁调用。链接中说,该函数有一个参数指定了需要等待的毫秒数(默认值为1),但是帮助中的原型却是:loop(self, timeout=1.0, max_packets=1),并且timeout为秒数。
    
          原型具体是什么暂且不论,loop到底干了什么呢,帮助中的描述说,loop调用select来监听socket事件,对于收到的数据,loop对其进行处理(通过调用上面定义的callback函数);对于发送函数发送的数据,loop来善后。下面是loop的帮助说明:
    
               This function must be called regularly to ensure communication with the broker is carried out. It calls select() on the network socket to wait  for network events. If incoming data is present it will then be processed. Outgoing commands, from e.g. publish(), are normally sent immediately that their function is called, but this is not always possible. loop() will also attempt to send any remaining outgoing messages, which also includes commands that are part of the flow for messages with QoS>0
    
          官网中给出的sub.py的例子中,最后调用了loop_forever()函数,这个函数的帮助中是这么描述的:
    
               This function call loop() for you in an infinite blocking loop. It is useful for the case where you only want to run the MQTT client loop in your program.
    
               loop_forever() will handle reconnecting for you. If you call disconnect() in a callback it will return.
    
          loop_forever()方法能够帮你调用loop方法,并且给你做重新连接处理,非常的方便。如果不是通过该方法调用loop,loop在一次会话后就返回了(测试是这样)。
    

    代码:

    sub.py的如下,我给添加了点注释(其实里面的注释已经很详细了),从这里面就能看出MQTT客户端的使用方法

    [python] view plain copy

    导入mosquitto的python库

    import mosquitto

    定义自己的callback 函数

    def on_connect(mosq, obj, rc):

    mosq.subscribe("$SYS/#", 0)

    print("rc: "+str(rc))

    def on_message(mosq, obj, msg):

    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

    def on_publish(mosq, obj, mid):

    print("mid: "+str(mid))

    def on_subscribe(mosq, obj, mid, granted_qos):

    print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(mosq, obj, level, string):

    print(string)

    创建客户端

    If you want to use a specific client id, use

    mqttc = mosquitto.Mosquitto("client-id")

    but note that the client id must be unique on the broker. Leaving the client

    id parameter empty will generate a random id for you.

    mqttc = mosquitto.Mosquitto()

    注册callback函数

    mqttc.on_message = on_message

    mqttc.on_connect = on_connect

    mqttc.on_publish = on_publish

    mqttc.on_subscribe = on_subscribe

    Uncomment to enable debug messages

    #mqttc.on_log = on_log

    连接服务器

    mqttc.connect("test.mosquitto.org", 1883, 60)

    订阅消息

    #mqttc.subscribe("string", 0)

    #mqttc.subscribe(("tuple", 1))

    #mqttc.subscribe([("list0", 0), ("list1", 1)])

    无限循环处理

    mqttc.loop_forever()

    要想更清楚的了解中间的原理,研究一下MQTT协议本身(轻量级的协议,也不是很难),和读一下mosquitto的源码(总共就2千行)就可以了。

    评论

报告相同问题?

悬赏问题

  • ¥15 求差集那个函数有问题,有无佬可以解决
  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名
  • ¥65 汇编语言除法溢出问题