weixin_39610468
2020-12-09 12:43 阅读 18

物联网宠儿mqtt.js那些事儿

image

常见的mq有Kafka,RocketMQ和RabbitMQ,大家也很常见。 前者很常见,属于微服务间的mq。那么MQTT是什么呢?MQTT属于IoT也就是物联网的概念。 快来和使用mqtt.js开发IM功能2年的作者一探究竟吧~

先来看下MQTT在物联网领域的应用场景: image

mqtt.js是MQTT在nodejs端的实现。 通过npm package.json包管理,现代vue技术栈下的前端也可用,比如用vue-cli,create-react-app等等构建的项目。

mqtt.js官方为微信小程序和支付宝小程序也做了支持。微信小程序的MQTT协议名为wxs,支付宝小程序则是alis

如果还是一脸懵逼,那么就跟随我通过mqtt.js去认识一下这个物联网领域的宠儿吧。

  • 什么是微消息队列?
  • MQTT关键名词解释
  • P2P消息和Pub/Sub消息
  • 封装的mqtt.js通用class
  • 客户端发包函数sendPacket
  • 客户端连接 mqtt.connect()
  • 订阅topic mqtt.Client#subscribe()
  • 发送消息 mqtt.Client#publish()
  • 接收消息 mqtt.Client#“message”事件

该提问来源于开源项目:FrankKai/FrankKai.github.io

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享

9条回答 默认 最新

  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    MQTT关键名词解释

    实例(Instance)

    每个MQTT实例都对应一个全局唯一的服务接入点。 肉眼可见的区别就是在通过mqtt.connect(url)与server(broker)建立连接时,broker的url都是一致的。 假设有saleman1,salesman2···他们本地的前端与服务端间建立连接的url都是统一的,只是在clientId进行区分即可。

    客户端Id(Client ID)

    MQTT的Client ID是每个客户端的唯一标识,要求全局都是唯一的,使用同一个Client ID连接会被拒绝。 阿里云的ClientID由两部分组成<GroupID>@@@<DeviceID>。 通常情况下Group ID是多前端统一的,比如PC端,安卓移动端,ios移动端,DeviceID也是多前端统一的。 那么如何区分多端呢?可以对Client ID中间的@@。 比如:

    js
    let CID_PC = `<groupid>@@@-PC<deviceid>`
    let CID_Android = `<groupid>@@@-Android<deviceid>`
    let CID_IOS = `<groupid>@@@-IOS<deviceid>`
    </deviceid></groupid></deviceid></groupid></deviceid></groupid>

    组Id(Group ID)

    用于指定一组逻辑功能完全一致的节点公用的组名,代表的是一类相同功能的设备。

    Device ID

    每个设备独一无二的标识。这个需要保证全局唯一,可以是每个传感器设备的序列号,可以是登录PC的userId。

    父主题(Parent Topic)

    MQTT协议基于Pub/Sub模型,任何消息都属于一个Topic。 Topic可以存在多级,第一级为父级Topic。 需要控制台单独创建。

    子主题(Subtopic)

    MQTT可以有二级Topic,也可以有三级Topic。 无需创建,代码中直接写即可。

    点赞 评论 复制链接分享
  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    P2P消息和Pub/Sub消息

    Pub/Sub消息就是订阅和发布的模式,类似事件监听和广播。 如果对发布订阅不理解,可以去看Webhook到底是个啥? MQTT除了支持Pub/Sub的模式,还支持P2P的模式。

    什么是P2P消息?

    • P2P,全称为(Point to Point)。
    • 一对一的消息收发模式,只有一个消息发送者和一个消息接收者。
    • P2P模式下,消息发送者明确知道消息的预期接收者,并且这个消息只能被这个特定的客户端消费
    • 发送者发送消息时,通过Topic指定接收者,接收者无需订阅即可获得该消息。
    • P2P 模式不仅降低注册订阅的成本,而且因为对链路有优化,所以降低推送延迟。

    P2P模式和Pub/Sub模式的区别

    发送消息时 - Pub/Sub模式下,发送者需要按照与接受者约定好的Topic发送消息 - P2P模式下,发送者无需按照Tpic发送,可以直接按照规范进行发送

    接收消息时 - Pub/Sub模式下,接收者需要提前订阅topic才能接消息 - P2P模式下无需订阅即可接收消息

    nodejs发送P2P消息

    js
    const p2pTopic =topic+"/p2p/GID_xxxx@@";
    mqtt.client.publish(p2pTopic);
    
    点赞 评论 复制链接分享
  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    封装的mqtt.js通用class

    • 客户端连接 initClient(config)
    • 订阅topic subscribeTopic(topic, config)
    • 发送消息 publishMessage(message)
    • 接收消息 handleMessage(callback)
    js
    import mqtt from 'mqtt';
    import config from '@/config';
    
    export default class MQTT {
      constructor(options) {
        this.name = options.name;
        this.connecting = false;
      }
      /**
       * 客户端连接
       */
      initClient(config) {
        const { url, groupId, key, password, topic: { publish: publishTopic }} = config;
        return new Promise((resolve) => {
          this.client = mqtt.connect(
            {
              url,
              clientId: `${groupId}@@@${deviceId}`,
              username: key,
              password,
            }
          );
          this.client.on('connect', () => {
            this.connecting = true;
            resolve(this);
          });
        });
      }
    
      /**
       * 订阅topic
       */
      subscribeTopic(topic, config) {
        if (this.connecting) {
          this.client.subscribe(topic, config);
        }
        return this;
      }
    
      /**
       * 发送消息
       */
      publishMessage(message) {
        this.client.publish(publishTopic, message, { qos: 1 });
      }
    
      /**
       * 接收消息
       */
      handleMessage(callback) {
        if (!this.client._events.message) {
          this.client.on('message', callback);
        }
      }
    
    }
    
    点赞 评论 复制链接分享
  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    客户端发包函数sendPacket

    mqtt-packet生成一个可传输buffer

    js
    var mqtt = require('mqtt-packet')
    var object = {
      cmd: 'publish',
      retain: false,
      qos: 0,
      dup: false,
      length: 10,
      topic: 'test',
      payload: 'test' // Can also be a Buffer
    }
    var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packet
    
    console.log(mqtt.generate(object))
    // Prints:
    //
    // <buffer>
    //
    // Which is the same as:
    //
    // new Buffer([
    //   48, 10, // Header (publish)
    //   0, 4, // Topic length
    //   116, 101, 115, 116, // Topic (test)
    //   116, 101, 115, 116 // Payload (test)
    // ])
    </buffer>

    sendPacket函数

    发出packetsend事件并且通过mqtt.writeToStream将packet写入client的stream中。

    js
    var mqttPacket = require('mqtt-packet')
    
    function sendPacket (client, packet) {
      client.emit('packetsend', packet)
      mqttPacket.writeToStream(packet, client.stream, client.options)
    }
    

    _sendPack方法

    js
    MqttClient.prototype._sendPacket = function (packet) {
         sendPacket(this, packet);
    }
    
    点赞 评论 复制链接分享
  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    客户端连接 mqtt.connect()

    mqtt client建立与mqtt server(broker)的连接,通常是通过给定一个'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'为协议的url进行连接。

    js
    mqtt.connect([url], options)
    

    官方说明: - 通过给定的url和配置连接到一个broker,并且返回一个Client。 - url可以遵循以下协议:'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'。(mqtt.js支持微信小程序和支付宝小程序,协议分别为wxs和alis。) - url也可以是通过URL.parse()返回的对象。 - 可以传入一个单对象,既包含url又包含选项。

    再来看一下我手上项目的连接配置,连接结果。 敏感信息已通过foo,bar,baz或者xxxx的组合进行数据脱敏处理。

    连接配置

    js
     {
        key: 'xxxxxxxx',
        secret: 'xxxxxxxx',
        url: 'wss://foo-bar.mqtt.baz.com/mqtt',
        groupId: 'FOO_BAR_BAZ_GID',
        topic: {
          publish: 'PUBLISH_TOPIC',
          subscribe: ['SUBSCRIBE_TOPIC/noticePC/', 'SUBSCRIBE_TOPIC/p2p'],
          unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',
        },
    }
    
    • key 账号
    • secret 密码
    • url 用于建立client与server(broker)mqtt连接的链接
    • groupId 组id
    • topic 发送消息的topic,订阅的topic,取消订阅的topic

    连接结果

    包括总览,响应头和请求头。

    General
    
    Request URL: wss://foo-bar.mqtt.baz.com
    Request Method: GET
    Status Code: 101 Switching Protocols
    
    Response Header
    
    HTTP/1.1 101 Switching Protocols
    upgrade: websocket
    connection: upgrade
    sec-websocket-accept: xxxxxxx
    sec-websocket-protocol: mqtt
    
    Request Header
    
    GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1
    Host: foo-bar.mqtt.baz.com
    Connection: Upgrade
    Pragma: no-cache
    Cache-Control: no-cache
    User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36
    Upgrade: websocket
    Origin: https://xxx.xxx.com
    Sec-WebSocket-Version: 13
    Accept-Encoding: gzip, deflate, br
    Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6
    Sec-WebSocket-Key: xxxxxxxxx
    Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
    Sec-WebSocket-Protocol: mqtt
    
    源码分析

    下面来看这段mqtt连接的代码。

    js
    this.client = mqtt.connect(
      {
        url,
        clientId: `${groupId}@@@${deviceId}`,
        username: key,
        password,
      }
    );
    
    js
    function parseAuthOptions (opts) {
      var matches
      if (opts.auth) {
        matches = opts.auth.match(/^(.+):(.+)$/)
        if (matches) {
          opts.username = matches[1]
          opts.password = matches[2]
        } else {
          opts.username = opts.auth
        }
      }
    }
    /**
     * connect - connect to an MQTT broker.
     *
     *  {String} [brokerUrl] - url of the broker, optional
     *  {Object} opts - see MqttClient#constructor
     */
    function connect (brokerUrl, opts) {
      if ((typeof brokerUrl === 'object') && !opts) {
        //  可以传入一个单对象,既包含url又包含选项
        opts = brokerUrl
        brokerUrl = null
      }
      opts = opts || {}
      // 设置username和password
      parseAuthOptions(opts)
      if (opts.query && typeof opts.query.clientId === 'string') {
        // 设置Client Id
        opts.clientId = opts.query.clientId
      }
      function wrapper (client) {
       ...
        return protocols[opts.protocol](client, opts)
      }
      // 最终返回一个mqtt client实例
      return new MqttClient(wrapper, opts)
    }
    
    点赞 评论 复制链接分享
  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    订阅topic mqtt.Client#subscribe()

    实际代码

    js
    const topic =  {
          subscribe: ['SUBSCRIBE_TOPIC/noticePC/', 'SUBSCRIBE_TOPIC/p2p'],
          unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',
    };
    const config = { qos:1 };
    this.client.subscribe(topic.subscribe, config)
    

    源码分析

    js
    MqttClient.prototype.subscribe = function () {
      var packet
      var args = new Array(arguments.length)
      for (var i = 0; i < arguments.length; i++) {
        args[i] = arguments[i]
      }
      var subs = []
       // obj为订阅的topic列表
      var obj = args.shift()
      // qos等配置
      var opts = args.pop()
      var defaultOpts = {
        qos: 0
      }
      opts = xtend(defaultOpts, opts)
      // 数组类型的订阅的topic列表  
      if (Array.isArray(obj)) {
        obj.forEach(function (topic) {
          if (!that._resubscribeTopics.hasOwnProperty(topic) ||
            that._resubscribeTopics[topic].qos < opts.qos ||
              resubscribe) {
            var currentOpts = {
              topic: topic,
              qos: opts.qos
            }
            // subs是最终的订阅的topic列表
            subs.push(currentOpts)
          }
        })
      }
      // 这个packet很重要
      packet = {
        // 发出订阅命令
        cmd: 'subscribe',
        subscriptions: subs,
        qos: 1,
        retain: false,
        dup: false,
        messageId: this._nextId()
      }
      // 发出订阅包
      this._sendPacket(packet)
      return this
    }
    
    点赞 评论 复制链接分享
  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    发送消息 mqtt.Client#publish()

    实际代码

    js
    const topic = {
          publish: 'PUBLISH_TOPIC',
    };
    const messge = {
       foo: '',
       bar: '',
       baz: '',
       ...
    }
    const msgStr = JSON.stringify(message);
    this.client.publish(topic.publish, msgStr);
    

    注意publish的消息需要使用JSON.stringify进行序列化,然后再发到指定的topic。

    源码分析

    js
    MqttClient.prototype.publish = function (topic, message, opts, callback) {
      var packet
      var options = this.options
      var defaultOpts = {qos: 0, retain: false, dup: false}
      opts = xtend(defaultOpts, opts)
    
      // 将消息传入packet的payload
      packet = {
        cmd: 'publish',
        topic: topic,
        payload: message,
        qos: opts.qos,
        retain: opts.retain,
        messageId: this._nextId(),
        dup: opts.dup
      }
      // 处理不同qos
      switch (opts.qos) {
        case 1:
        case 2:
           // 发出publish packet
           this._sendPacketI(packet);
            ...
        default:
           this._sendPacket(packet);
            ...
      }
      return this
    }
    
    点赞 评论 复制链接分享
  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    接收消息 mqtt.Client “message”事件

    实际代码

    js
    this.client.on('message', callback);
    

    数据以callback的方式接收。

    js
    function (topic, message, packet) {}
    

    topic代表接收到的topic,buffer则是具体的数据。 message是接收到的数据,谨记通过JSON.parse()对buffer做解析。

    js
    handleMessage(callback) {
        this.client.on('message', callback);
    }
    this.client.handleMessage((topic, buffer) => {
      let receiveMsg = null;
      try {
       receiveMsg = JSON.parse(buffer.toString());
      } catch (e) {
       receiveMsg = null;
      }
      if (!receiveMsg) {
        return;
      }
      ...do something with receiveMsg...
    });
    

    源码分析

    MqttClient使用inherits包继承了EventEmitter。 从而进行可以使用on监听“message”事件。

    js
    inherits(MqttClient, EventEmitter)
    

    那么到底是在哪里间发出message事件的呢?>emit the message event 1. 基于websocket-stream建立websocket连接 2. 使用pipe连接基于readable-stream.Writable创建的可写流 3. nextTick调用_handlePacket 4. 在handlePacket中调用handlePublish发出message事件

    1.基于websocket-stream建立websocket连接
    js
    this.stream = this.streamBuilder(this)
    function streamBuilder (client, opts) {
      return createWebSocket(client, opts)
    }
    var websocket = require('websocket-stream')
    function createWebSocket (client, opts) {
      var websocketSubProtocol =
        (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
          ? 'mqttv3.1'
          : 'mqtt'
    
      setDefaultOpts(opts)
      var url = buildUrl(opts, client)
      return websocket(url, [websocketSubProtocol], opts.wsOptions)
    }
    
    2. 使用pipe连接基于readable-stream.Writable创建的可写流
    js
    var Writable = require('readable-stream').Writable
    var writable = new Writable();
    this.stream.pipe(writable);
    
    3.nextTick调用_handlePacket
    js
    writable._write = function (buf, enc, done) {
        completeParse = done
        parser.parse(buf)
        work()
    }
    function work () {
        var packet = packets.shift()
        if (packet) {
          that._handlePacket(packet, nextTickWork)
        }
    }
    function nextTickWork () {
        if (packets.length) {
          process.nextTick(work)
        } else {
          var done = completeParse
          completeParse = null
          done()
        }
    }
    
    4. 在handlePacket中调用handlePublish发出message事件
    js
    MqttClient.prototype._handlePacket = function (packet, done) {
      switch (packet.cmd) {
        case 'publish':
          this._handlePublish(packet, done)
          break
       ...
    }
    // emit the message event
    MqttClient.prototype._handlePublish = function (packet, done) {
      switch (qos) {
        case 1: {
          // emit the message event
            if (!code) { that.emit('message', topic, message, packet) }
        }
    }
    
    点赞 评论 复制链接分享
  • weixin_39610468 weixin_39610468 2020-12-09 12:43

    参考资料: - https://cloud.tencent.com/developer/news/313537 - https://github.com/mqttjs/MQTT.js - https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.4.5.7bd4d07a16OtMV - https://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.2.13.4a6571d7y9NyuA#concept-96176-zh

    点赞 评论 复制链接分享

相关推荐