YZRHANYU 2022-11-27 20:56 采纳率: 57.1%
浏览 19
已结题

uniapp websocket 发送 sub 监听后,代码自动发送了 unsub ,解除了监听

项目背景

uniapp 项目使用 websocket 监听数据,使用了 rxjs 中的 Observable

相关代码

封装的 GlobalWebsocket 代码


import store from '@/store/index.js';
import Config from '@/core/config'
import {
    Observable
} from "rxjs";

// 后端api地址
const wsHost = Config.get('wsUrl')
let ws;
let count = 0;
var subs = {};
let timer = {};
const MAX_RETRIES = 2000;
let trySendCount = 0;
let tempQueue = [];
let socketOpen = false;
const initWebSocket = () => {
    let token = store.state.token ? store.state.token : store.getters.token;
    const wsUrl = `${wsHost}/ws/messaging?:X_Access_Token=${token}&:X_Type=2`;
    try {
        //微信websocket最大并发不能超过5个
        //https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.html

        if (count > 0) {
            return ws;
        }
        clearInterval(timer);
        ws = uni.connectSocket({
            url: wsUrl,
            complete: () => {}
        });
        count += 1;
        uni.onSocketClose(function() {
            socketOpen = false;
            ws = undefined;
            setTimeout(initWebSocket, 5000 * count);
        });
        uni.onSocketOpen(function() {
            socketOpen = true;
        });
        uni.onSocketMessage(function(msg) {
            var data = JSON.parse(msg.data);
            if (data.type === 'error') {
                uni.showToast({
                    title: data.message,
                    icon: "none",
                    duration: 3500
                })
            }
            if (subs[data.requestId]) {
                if (data.type === 'complete') {
                    subs[data.requestId].forEach(function(element) {
                        element.complete();
                    });;
                } else if (data.type === 'result') {
                    subs[data.requestId].forEach(function(element) {
                        element.next(data);
                    });;
                }
            }
        });
    } catch (error) {
        setTimeout(initWebSocket, 5000 * count);
    }

    timer = setInterval(function() {
        try {
            ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({
                "type": "ping"
            })) : 0;
        } catch (error) {
            console.error(error, '发送心跳错误');
        }
        //同时判断
        if (tempQueue.length > 0 && ws && ws.readyState === 1) {
            sendSocketMessage(tempQueue[0], 1);
        }
    }, 2000);
    return ws;
};

//flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
function sendSocketMessage(msg, flag) {
    if (socketOpen) {
        uni.sendSocketMessage({
            data: msg
        });
        if (flag === 1) {
            tempQueue.splice(0, 1);
        }
    } else {
        if (flag != 1) {
            tempQueue.push(msg);
        }
    }
}


const getWebsocket = (id, topic, parameter) => {
    return Observable.create(function(observer) {
        if (!subs[id]) {
            subs[id] = [];
        }
        subs[id].push({
            next: function(val) {
                observer.next(val);
            },
            complete: function() {
                observer.complete();
            }
        });
        var msg = JSON.stringify({
            id: id,
            topic: topic,
            parameter: parameter,
            type: 'sub'
        });
        var thisWs = initWebSocket();
        if (thisWs) {
            try {
                sendSocketMessage(msg);
            } catch (error) {
                initWebSocket();
                uni.showToast({
                    title: 'websocket服务连接失败',
                    icon: "none",
                    duration: 3500
                })
            }
        } else {
            tempQueue.push(msg);
            ws = undefined
            count = 0
            initWebSocket();
        }
        return function() {
            console.log("这里")
            
            var unsub = JSON.stringify({
                id: id,
                type: "unsub"
            });
            
            console.log(subs[id])
            console.log(unsub)
            delete subs[id];
            if (thisWs) {
                sendSocketMessage(unsub)
            }
        };
    });
};
exports.getWebsocket = getWebsocket;

使用 websocket 的位置

    // 初始化 WebSocket
            initWebSocket() {
                const app = this
                const productId = 'DC-TOWER';
                const deviceId = '2209001';
                const groupId = '$TOWER_SENSOR';
                // deviceOnlineStatus && deviceOnlineStatus.unsubscribe();
                app.deviceOnlineStatus = getWebsocket(
                  `location-info-status-online-${deviceId}`,
                  `/device/${productId}/${deviceId}/status`,
                ).subscribe((resp) => {
                  const { payload } = resp;

                  if (resp.requestId === `location-info-status-${deviceId}`) {

                  }
                });
                

                
                app.propertyStatus = getWebsocket(
                  `location-info-message-property-${deviceId}`,
                  `/device/${productId}/${deviceId}/${groupId}/message/property/report`,
                ).subscribe((resp) => {
                    console.log(resp)
                  const { payload } = resp;
                  if (resp.requestId === `location-info-message-property-${deviceId}`) {

                  }
                });
            },
            removeSubscribe() {
                console.log("解绑了")
                const app = this
                console.log(app.deviceOnlineStatus)
                console.log(app.propertyStatus)
                if (app.deviceOnlineStatus) {
                    
                    app.deviceOnlineStatus.unsubscribe();
                }
                if (app.propertyStatus) {
                    app.propertyStatus.unsubscribe();
                }
            },

运行结果及报错内容

在 initWebsocket 方法中,两次调用 GlobalWebsocket 中的 getWebsocket() ,应该会通过传递的参数,发送两次监听
但是现在在 sub 之后,直接自己 unsub 了,而且在 unsub 之后按说不该接收到对应的 payload ,但是还是接收到了(这里没有调用 unsubscribe,它自己执行了)

img

我的解答思路和尝试过的方法

现在的分析是 : websocket ,没有调用 unsub 结果它自己执行 unsub,调用的时候反而没执行
发送 sub 是通过 GlobalWebsocket 中的 getWebsocket() 中发送的
而 unsub 则应该是调用 getWebsocket 返回的对象的unsubscribe来执行的
但是在上面的代码当中没有调用过 unsubscribe ,它自己在 返回 completed 之后 unsub 了,而且 debugg 的时候,在 getWebsocket 中返回的函数中打了断点,断点没有卡上,不过当中的输出语句还正常输出,正常执行了 unsub

我想要达到的结果

在调用 getWebsocket() 之后,通过传递的参数,执行 send 方法,发送 sub 订阅,当不需要的时候通过调用 unsubscribe 来发送 unsub

  • 写回答

3条回答 默认 最新

  • 普通网友 2022-11-27 22:47
    关注
    获得7.50元问题酬金

    1、你的做法有点尴尬;
    2、uniapp跟其他前端的操作方式有点不一样;
    3、你可以试试搞一个全局变量,然后根据状态来处理ws即可。

    评论

报告相同问题?

问题事件

  • 系统已结题 12月5日
  • 赞助了问题酬金15元 11月27日
  • 创建了问题 11月27日

悬赏问题

  • ¥15 零基础200题编字典问题
  • ¥15 win11移除微软账户登录
  • ¥15 部署zabbix登录时跳转如下页面
  • ¥15 cup+fpga+88E1111 rgmii to sgmii
  • ¥15 请问如何从gprmax中导出的merged.out文件获取雷达数据(应该是个二维数组吧),我看到网上很多对雷达数据的操作但是都没有说如何获得这个数据,因为out文件不能直接操作要转成其他格式是吧
  • ¥15 usb相机曝光度expos增加一点,获取图像速度显著变慢。
  • ¥15 如何解决如图问题:我创建的java maven项目中使用了03版本的excel,我先进行了文件读取的操作(可以正常取出,文件还未损坏),文件还未损坏),然后在进行了对同一文件的写入操作
  • ¥100 对接GooglePay/GoogleWallet咨询
  • ¥15 Odoo 17系统中如何配置自动更新生产成本功能
  • ¥15 如何提取京东订单生成QQ支付链接