为什么能接收到消息,但是发送的消息,在mqttx里面却收不到
这是接收的代码
package com.yc.go.config.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* MQTT消费端
*/
@Configuration
public class IotMqttSubscriberConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean(name = "mqttSubscriberClientFactory")
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttConfig.getUsername());
mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setServerURIs(mqttConfig.getHostUrl().split(","));
mqttConnectOptions.setKeepAliveInterval(2);
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
String[] inboundTopics = mqttConfig.getDefaultTopic().split(",");
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttConfig.getClientId() + "_inbound", mqttClientFactory(), inboundTopics); //对inboundTopics主题进行监听
adapter.setCompletionTimeout(5000);
adapter.setQos(1);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") //异步处理
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("----------------------");
System.out.println("message:" + message.getPayload());
System.out.println("PacketId:" + message.getHeaders().getId());
System.out.println("Qos:" + message.getHeaders().get(MqttHeaders.QOS));
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
System.out.println("topic:" + topic);
}
};
}
}
这是发送的代码
package com.yc.go.config.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @author yc
*/
@Configuration
public class IotMqttProducerConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttConfig.getUsername());
mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setServerURIs(mqttConfig.getHostUrl().split(","));
mqttConnectOptions.setKeepAliveInterval(60);
mqttConnectOptions.setConnectionTimeout(60);
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId() + "outbound", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
}
```java
@Service
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送到mqtt
*
* @param payload 有效载荷
*/
void sendToMqtt(String payload);
/**
* 发送到mqtt
*
* @param topic 主题
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送到mqtt
*
* @param topic 主题
* @param qos qos
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
但是这一段代码却可以发送
@PostMapping("/index")
public String index() {
String broker = "tcp://broker.emqx.io:1883";
// TLS/SSL
// String broker = "ssl://broker.emqx.io:8883";
String topic = "testtopic1";
String username = "emqx";
String password = "public";
String clientid = "publish_client";
String content = "Hello MQTT";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
// 设置用户名和密码
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(60);
options.setConnectionTimeout(60);
// 连接
client.connect(options);
// 创建消息并设置 QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 发布消息
client.publish(topic, message);
System.out.println("Message published");
System.out.println("topic: " + topic);
System.out.println("message content: " + content);
// 断开连接
client.disconnect();
// 关闭客户端
client.close();
} catch (MqttException e) {
throw new RuntimeException(e);
}
return "index";
}
```