关于mqtt断线重连后消息停留在broker上,没有推送到客户端

最近在学习mqtt,拿到例程来实验发现离线后推送的信息不能在重连后接收到,这是为什么?我用wireshark照着手册看了一遍没毛病啊,而且断线重连后订阅的消息有些能收到,有些会停留在代理上,求解!!!

 public class MQTTPublish implements MqttCallback {  

    //public static final String HOST = "tcp://10.0.0.250:1884";
    public static final String HOST = "tcp://192.168.67.130:61613"; 

    public static final String TOPIC = "MQTTtest";  
    private static final String clientid ="publisher";   

    private static final String str = "ad钙奶";
    private static MqttClient client;  

    private MqttTopic topic;  
    private String userName = "admin";  
    private String passWord = "password";  

    private MqttMessage message;  

    public MQTTPublish() throws MqttException {  
        client = new MqttClient(HOST, clientid, new MemoryPersistence());  
        connect();  
    }  

    private void connect() {  
        MqttConnectOptions options = new MqttConnectOptions();  
        options.setCleanSession(false);  
        options.setUserName(userName);  
        options.setPassword(passWord.toCharArray());  
        options.setConnectionTimeout(10);   
        options.setKeepAliveInterval(20);  
        try {  
               client.setCallback(this);  

               client.connect(options);  
               topic = client.getTopic(TOPIC);  
        } catch (Exception e) {  
               e.printStackTrace();  
        }  
    }  

    public void publish(MqttMessage message) throws MqttPersistenceException, MqttException{  
        MqttDeliveryToken token = topic.publish(message);  
        token.waitForCompletion();  
        System.out.println("Token is complete:" + token.isComplete());  
    }  

    public static void main(String[] args) throws MqttException {  
        MQTTPublish mqttpub =  new MQTTPublish();  
        mqttpub.message = new MqttMessage();  
        mqttpub.message.setQos(2);  
        mqttpub.message.setRetained(true);  
        mqttpub.message.setPayload(str.getBytes());  
        mqttpub.publish(mqttpub.message);  
        System.out.println("Ratained state:" + mqttpub.message.isRetained());  

        client.disconnect();
        System.out.println("Disconnected");
        System.exit(0);
    }

    @Override
    public void connectionLost(Throwable arg0) {
        // TODO Auto-generated method stub

    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        try {
            System.out.println("deliveryComplete---------"+ token.isComplete());  
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }

    @Override
    public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
        // TODO Auto-generated method stub

    }  


} 
 public class MQTTSubscribe implements MqttCallback {  

    //public static final String HOST = "tcp://10.0.0.250:1884";
    public static final String HOST = "tcp://192.168.67.130:61613";  

    public static final String TOPIC = "MQTTtest";  
    private static final String clientid = "subscriber";  
    private MqttClient client;  
    private MqttConnectOptions options;  
    private String userName = "admin";  
    private String passWord = "password";  
    private ScheduledExecutorService scheduler;  


    public void startReconnect() {  
        scheduler = Executors.newSingleThreadScheduledExecutor();  
        scheduler.scheduleAtFixedRate(new Runnable() {  
            public void run() {  
                if (!client.isConnected()) {  
                    try {  
                        client.connect(options);  
                    } catch (MqttSecurityException e) {  
                        e.printStackTrace();  
                    } catch (MqttException e) {  
                        e.printStackTrace();  
                    }  
                }  
            }  
        }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);  
    }   

    private void start() {  
        try {    
            client = new MqttClient(HOST, clientid, new MemoryPersistence());  
            options = new MqttConnectOptions();  
            options.setCleanSession(false);  
            options.setUserName(userName);  
            options.setPassword(passWord.toCharArray());  
            options.setConnectionTimeout(10);   
            options.setKeepAliveInterval(20);  
            client.setCallback(this); 
//            MqttTopic topic = client.getTopic(TOPIC);  
//            options.setWill(topic, "close".getBytes(), 2, true);  

            client.connect(options);  
            int[] Qos  = {2};  
            String[] topic1 = {TOPIC};  
            client.subscribe(topic1, Qos);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  


    public static void main(String[] args) throws MqttException {     
        MQTTSubscribe client = new MQTTSubscribe();  
        client.start();  
    }

    public void connectionLost(Throwable cause) {  
        System.out.println("Connection lost, reconnect please!");  
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
            try {
                System.out.println("deliveryComplete---------"+ token.isComplete());  
            } catch (Exception e) {
                e.printStackTrace();
            } 
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
          System.out.println("Message arrived on topic:"+topic);  
          System.out.println("Message arrived on QoS:"+message.getQos());  
          System.out.println("Message arrived on content:"+new String(message.getPayload()));   
    }  

}  

2个回答

对于离线消息为什么不能接收到,我想说的是 你很有可能在再次连接的时候又重新订阅了该topic。这样是接收不到离线消息的。
离线消息是订阅了某个topic的client 在断开连接以后,在接下来又重新连接的时候,任然可以收到在它断开连接的这段时间内,该topic上
的消息。这就需要将client的 setCleanSession 设置为false,这样服务器才能保留它的session,再次建立连接的时候,它就会继续使用这个session了。
注意:clientId 是不能更改的。

只是进行了重连,还需要再次发起订阅

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
MQTT断线重连
MQTT客户端:org.eclipse.paho.client.mqttv3 MQTT服务器:EMQ MQTT服务器官网:http://emqtt.com/ 如果第一次看MQTT,可以参考:http://blog.csdn.net/whb3299065/article/details/79088928 在之前的文章中我们简单介绍了MQTT的收发消息,并没有实现重连机制,我在实现重连时,发现...
MQTT断线重连及订阅消息恢复
MQTT断线重连及订阅消息恢复 工具 MQTT客户端是用的 pahohttp://www.eclipse.org/paho/ 问题 采用以下配置 connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setConnectionT...
C# MQTT 断线重连
初次接触MQTT这篇博客借鉴了:https://blog.csdn.net/lxrj2008/article/details/76067242自己做了一些修改1.添加全局静态变量 uPLibrary.Networking.M2Mqtt.MQTTConfig.IsSocketRun;class MQTTConfig{ public static bool IsSocketRun = false; ...
c#Socket客户端断线重连
c#socket异步编程及断线重连。并怎样实现循环接受服务器的命令并处理。
Netty客户端断线重连
参考:https://www.jianshu.com/p/c78b37a2ca47 与安卓交互需要用到 特此记录 public class NettyClient { public final static String HOST = "192.168.31.178"; public final static int PORT = 9527; private ...
iOS MQTT使用案例 (断线重连)
iOS MQTT使用详解 (断线重连)参考了 iOS MQTT—-MQTTClient实战-看这篇的就够了 大神写的这篇git: MQTT-Client-Framework介绍啥的看百度,上面大神写的就行了,直接上干货。安装:pod 'MQTTClient'创建一个单例管理类来处理MQTT有关操作导入: #import<MQTTClient/MQTTClient>MQTTSession初始化:-
MQTT断线重连订阅无法接收
工具 MQTT客户端是用的 paho http://www.eclipse.org/paho/ 问题 采用以下配置 connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setConnectionTimeout(10); ...
websocket断线和重连
项目将应用python、mysql、redis、tornado、sqlalchemy、sockjs、dplayer、wtforms、werkzeug等技术打造一个完整弹幕视频+多人在线聊天室。我将实战过程中带大家快速掌握python mtv的设计模式、基于线程池的异步io非阻塞、websocket实时长连接的技术原理。rn
QTcpSocket断线重连
m_pTcpSocket=new QTcpSocket; connect(m_pTcpSocket,SIGNAL(disconnected()), this,SLOT(slotDisconnected()),Qt::DirectConnection); m_pTcpSocket->connectToHost(HostAddr,Port); if(m
8.Netty之断线重连
Netty之断线重连
Netty的断线重连
因为工作中经常使用到TCP,所以会频繁使用到诸如Mina或Netty之类的通信框架,为了方便项目的逻辑调用,经常会在框架的基础上再一次进行封装,这样做其实有画蛇添足的嫌疑,但也是无奈之举。 /** * 提供重连功能,需传入bootstrap,并实现handlers */ @ChannelHandler.Sharable public abstract class FunctionsChanne...
Mina 断线重连
Mina 断线重连     定义:这里讨论的Mina 断线重连是指使用mina作为客户端软件,连接其他提供Socket通讯服务的服务器端。Socket服务器可以是Mina提供的服务器,也可以是C++提供的服务器。            一、断线重连的方式;     1. 在创建Mina客户端时增加一个监听器,或者增加一个拦截器,当检测到Session关闭时,自动进行重连。    ...
断线重连
我想写一个远程控制数据库的小程序,能实现断线重连,请问各位有什么好的方法?(新手一枚[img=https://forum.csdn.net/PointForum/ui/scripts/csdn/Plugin/001/face/13.gif][/img])
okHttpWebSocket断线重连
[size=14px]okHttpWebSocket断线重连怎么做啊??各位哥!![/size]
obs-studio 断线重连
obs发送逻辑在rtmp-stream.c文件中 暴露出的重连接口  /**  * Sets the reconnect settings.  Set retry_count to 0 to disable reconnecting.  */ EXPORT void obs_output_set_reconnect_settings(obs_output_t *output, int
断线重连机制
Zookeeper的客户端具有自动重连机制,当出现网络异常时,客户端会自动重连直到与集群中的某台机器连接成功,连接过程如下图所示: 1. 网络异常情况包括网络闪断、ZK服务器宕机等情况,这会导致连接断开CONNECTION_LOSS,此时客户端会收到事件None-Disconnected; 2. 如果在SessionTimeout时间内连接成功,则客户端收到事件None-SynConnect
校园网断线重连
很好的,很好的解决了校园网断线重连的问题,方便广大用户的切实需要
断线重连总结
断线重连总结 gateserver负责所有与客户端的直接连接 m_conns[10000]也就是一个gateserver最多可以维持10000条socket连接,蓝月采用的是tcp 行走各种消息都是tcp,不存在丢包一说,只会延迟 每个客户端点击登录时会做以下事情 建立socket tcp连接,向gateserver 的ip port发送请求, gateserver收到
数据库断线重连
数据库断线重连是指,在服务器出现某些原因导致数据库连接中断,需要启动重新连接数据库,并重新执行中断的数据库操作。 Thinkphp5.0 是支持数据库断线重连的,代码很值得学习。它支持查询 query(包括select查询等读取操作),执行 execute (包括insert、update等写入操作)和 事务 startTrans() 的断线重连 这三种类型的数据库断线重连操作。 下面
websocket 断线重连
摘要 websocket reconnect websocket是html5发布之后出现的一种新技术,说它是新技术,其实也不是多新的技术了,因为毕竟也有2-3年了,但是找了很多国内的实例,缺发现不多,不知道是它不好用呢,还是说这种技术原来就有缺陷呢,咱们暂且不说,今天我们就来介绍一下websocket的断线重连,,,, 这里先提供一个类库,https://github.com/j
MINA断线重连.
MINA断线重连. [url]http://chwshuang.iteye.com/blog/2028951[/url] Netty4更新详解 [url]http://janeky.iteye.com/blog/1844201[/url]
手游断线重连
断线重连,其实并不是一个神马高深的东西,相信各位做游戏的程序们都处理过这个问题,但是,怎么把这个断线重连做的安全,完善呢?下面就跟各位大神来讨论讨论这个问题~! 游戏中,断线重连(这里是基于TCP长连来讨论的)其实跟游戏类型有关,比如页游,端游,还是手游。当然,我只做过页游和手游,所以咱不对端游发表意见。对于页游而言,一般是PC网络是很稳定的,如果不稳定,其实是没办法玩游戏的,所...
js定时器断线重连,
我通过ajax请求action,action返回信息停止定时器,可是断网重连后定时器会弹出好几次信息才停止rnfunction getAlertMessage() rn var url = $("#alertMaessage").val();rn $.post(url, , function(data) rn var loggerinfo = data.split("_");rn if (loggerinfo[1] == "success") rn alert("导入成功");rn clearInterval(alertTimer);rnrn else if (loggerinfo[1] == "fail") rn alert("导入失败");rn clearInterval(alertTimer);rn rn );rn
tcpclient断线重连
现在我只有一个可用本地端口,要求与对方服务器保持连接。rn当检测到断线的时候,对tcpclient进行close,然后重新使用这个本地端口进行重连。rn释放以后有时候能重连成功,但是经常端口保持着fin_wait状态或者偶尔是close_wait状态,导致不能通过这个端口与对方建立连接rn有什么办法能保证close的时候关闭端口,使得我可以通过这个端口进行重连?
TCP的断线重连
当断线的时候,我想用原有的SOCKET重连。如果不closesocket出现10056的错误,用一个strsocket保存后,closesocket,然后用strsocket连接,出现10038的错误。rn有什么办法能够不改变socket的值重连呢?
断线重连 心跳
using System.Collections;using System.Collections.Generic;using UnityEngine;using System.Net;using System.Net.Sockets;using UnityEngine.UI;using System;using System.Text;[XLua.LuaCallCSharp]public cla...
C#+Socket客户端断线重连的解决办法
流程如下: 程序开始 -> 先connect一下服务端 -> 若连接成功 -> 程序继续执行该干嘛干嘛                                                        |                                                        |                            
Delphi socket客户端断线重连
Delphi socket客户端断线重连,怎么样才能实现呢?
java socket客户端断线重连
java socket client 断线重连的简单实现 有什么意见可以提哦
socket 客户端断线重连的问题。
[code=C/C++][/code]rnrn//初始化WinSockrn WSADATA wsaData;rn if (WSAStartup(MAKEWORD(2,2),&wsaData)!=NO_ERROR)rn rn std::cout<<"Initialization failed"<h_addr_list[0],pHost->h_length);rnrn std::cout<<"ServerIp:"<<(int) RemoteAddr.sin_addr.S_un.S_un_b.s_b1<<"."rn <<(int) RemoteAddr.sin_addr.S_un.S_un_b.s_b2<<"."rn <<(int) RemoteAddr.sin_addr.S_un.S_un_b.s_b3<<"."rn <<(int) RemoteAddr.sin_addr.S_un.S_un_b.s_b4<
mina客户端断线重连、心跳设置
最近用mina写客户端一点心得,分享下,如果有不对的地方,希望留言更正。   编码、解码工厂就不写了,网上有很多。   private static NioSocketConnector connector ; private static IoSession session; public static IoSession getIoSession(){ retu...
求助:SCTP 断线后重连问题
目前正在做一个基于SCTP下的客户端rn与服务器端连接rn网络环境:debian 3.1 ,内核 2.6.8rnrn建立连接后,长时间没有数据包通过,会发heartbeat,debian下默认是30秒rnrn可是如何服务器端突然断电,客户端应该释放资源然后重新与服务器端建立链接rnrn问题是,客户端如何才能知道服务器端已经异常了?rnrnSCTP有heartbeat功能,可是,上层的应用程序怎么知道heartbeat超时了呢?rnrn应该是可以设置,然后进行监听,但不知道具体如何解决...rnrn下面这个链接说明了在TCP上的解决方法,可是,却没找到SCTP下的rnhttp://topic.csdn.net/t/20050418/11/3945333.htmlrnrn正郁闷中,有无良策,多谢
ArcGIS Engine 断线后重连
ArcGIS Engine 断线重连使用ArcGIS Engine进行二次开发时,如果连接断开后,再执行任何操作都没有反应,只能重新启动应用程序,这篇文章就是用来解决这个问题。通过查阅ArcGIS的文档,我们得知WorkspaceFactory实现了IWorkspaceFactoryStatus 这是一个可选接口(optional),通过查询ArcObjects SDK 的帮助文档,知道Sde
ESP8266开发,实现MQTT客户端,与MQTT服务器通信,断线重连
ESP8266开发,实现MQTT客户端,与MQTT服务器通信,断线重连,有软件安装、使用及详细的代码注释,方便快速搭建自己的物联网系统。
android 实现mqtt消息推送,以及不停断线重连的问题解决
前段时间项目用到mqtt的消息推送,整理一下代码,代码的原型是网上找的,具体哪个地址已经忘记了。代码的实现是新建了一个MyMqttService,全部功能都在里面实现,包括连服务器,断线重连,订阅消息,处理消息,发布消息等基本操作。首先添加依赖:dependencies {     implementation 'org.eclipse.paho:org.eclipse.paho.client.m...
mqtt协议 springboot2.0.4 mqttv3 发布订阅代码调用,mqtt断线重连
上篇博文讲了安装和配置:https://blog.csdn.net/jianeng_Love_IT/article/details/83061717 mqttv3 发布订阅代码调用 我用的是springboot2.0.4 直接上代码: pom.xml &amp;lt;dependency&amp;gt; &amp;lt;groupId&amp;gt;org.eclipse.paho&amp;lt;...
MQTT 客户端收发 MQTT 消息
本文主要介绍如何使用 MQTT 客户端收发 MQTT 消息,并给出示例代码供前期开发测试参考,包括资源创建、环境准备、示例代码、注意事项等。 注意: 本文给出的实例均基于 Eclipse Paho Java SDK 实现,SDK 下载请参见 MQTT 接入准备。如使用其他第三方的客户端,请适当修改。 1. 资源创建 使用 MQ 提供的 MQTT 服务,首先需要核实应用中使用的 Topic ...
c#socket异步及断线重连
c#socket异步编程及断线重连。并怎样实现循环接受服务器的命令并处理。
netty 断线重连+心跳
netty使用自带工具类实现断线重连和心跳包
ActiveMQ的断线重连机制
断线重连机制是ActiveMQ的高可用性具体体现之一。 具体就是使用failover方式,使得连接断开之后,可以不断的重试连接到一个或多个brokerURL。 例如:failover:(tcp://127.0.0.1:61616) ,这里可以使用多个url。 默认情况下,如果client与broker直接的connection断开,则client会新起一个线程, 不断的从url参数中获取...
相关热词 c#检测非法字符 c#双屏截图 c#中怎么关闭线程 c# 显示服务器上的图片 api嵌入窗口 c# c# 控制网页 c# encrypt c#微信网页版登录 c# login 居中 c# 考试软件