doushenjia8514 2018-06-28 17:56
浏览 317
已采纳

使用Goroutine订阅MQTT消息

I currently have a Go code that can subscribe and print sensor data that is published to a certain topic. Here is my code:

package main

import (
    "crypto/tls"
    "flag"
    "fmt"
    //"log"
    "os"
    "os/signal"
    "strconv"
    "syscall"
    "time"
    MQTT "github.com/eclipse/paho.mqtt.golang"
)

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    //fmt.Printf("Received message on topic: %s
Message: %s
", message.Topic(), message.Payload())
    fmt.Printf("%s
", message.Payload())
}

func main() {
    //MQTT.DEBUG = log.New(os.Stdout, "", 0)
    //MQTT.ERROR = log.New(os.Stdout, "", 0)
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    hostname, _ := os.Hostname()

    server := flag.String("server", "tcp://test.mosquitto.org:1883", "The full url of the MQTT server to connect to ex: tcp://127.0.0.1:1883")
    topic := flag.String("topic", "topic/sensorTemperature", "Topic to subscribe to")


    qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
    clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection")
    username := flag.String("username", "", "A username to authenticate to the MQTT server")
    password := flag.String("password", "", "Password to match username")
    flag.Parse()

    connOpts := MQTT.NewClientOptions().AddBroker(*server).SetClientID(*clientid).SetCleanSession(true)
    if *username != "" {
        connOpts.SetUsername(*username)
        if *password != "" {
            connOpts.SetPassword(*password)
        }
    }
    tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
    connOpts.SetTLSConfig(tlsConfig)

    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
            panic(token.Error())
        }
    }

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    } else {
        fmt.Printf("Connected to %s
", *server)
    }

    <-c
}

Instead of subscribing to messages like this, I want to put the part of the code that subscribes in a Goroutine. I want to be able to call go func onMessageReceived. How can I do that if this function is called in c.Subscribe? And how can I add a sync.WaitGroup parameter in? Thank you.

  • 写回答

1条回答 默认 最新

  • donglieshe4692 2018-06-28 18:49
    关注

    Since you're passing the function as a parameter to another function, you don't get to control the way in which it is called. However, you do have total control over what happens inside the function - which means you can start a goroutine there:

    func onMessageReceived(client MQTT.Client, message MQTT.Message) {
        go func() {
            fmt.Printf("%s
    ", message.Payload())
        }()
    }
    

    So, onMessageReceived itself will still be called synchronously by MQTT, but it will just start a goroutine and immediately return. You could also define a separate function and call that with go instead of an anonymous function:

    func onMessageReceived(client MQTT.Client, message MQTT.Message) {
        go messageHandler(client, message)
    }
    
    func messageHandler(client MQTT.Client, message MQTT.Message) {
        fmt.Printf("%s
    ", message.Payload())
    }
    

    That's just a matter of how you want to organize your code. If it's a short handler I'd probably stick with the anonymous function (short enough that you can see the entire anonymous func on one screen); for a longer function I'd break it up or break it out into a named function.

    Since you can't pass in any extra parameters, if you want to use a WaitGroup, it will have to be global:

    var wg = new(sync.WaitGroup)
    
    func onMessageReceived(client MQTT.Client, message MQTT.Message) {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Printf("%s
    ", message.Payload())
        }()
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 关于#matlab#的问题:在模糊控制器中选出线路信息,在simulink中根据线路信息生成速度时间目标曲线(初速度为20m/s,15秒后减为0的速度时间图像)我想问线路信息是什么
  • ¥15 banner广告展示设置多少时间不怎么会消耗用户价值
  • ¥16 mybatis的代理对象无法通过@Autowired装填
  • ¥15 可见光定位matlab仿真
  • ¥15 arduino 四自由度机械臂
  • ¥15 wordpress 产品图片 GIF 没法显示
  • ¥15 求三国群英传pl国战时间的修改方法
  • ¥15 matlab代码代写,需写出详细代码,代价私
  • ¥15 ROS系统搭建请教(跨境电商用途)
  • ¥15 AIC3204的示例代码有吗,想用AIC3204测量血氧,找不到相关的代码。