项目背景
uniapp 项目使用 websocket 监听数据,使用了 rxjs 中的 Observable
相关代码
封装的 GlobalWebsocket 代码
import store from '@/store/index.js';
import Config from '@/core/config'
import {
Observable
} from "rxjs";
// 后端api地址
const wsHost = Config.get('wsUrl')
let ws;
let count = 0;
var subs = {};
let timer = {};
const MAX_RETRIES = 2000;
let trySendCount = 0;
let tempQueue = [];
let socketOpen = false;
const initWebSocket = () => {
let token = store.state.token ? store.state.token : store.getters.token;
const wsUrl = `${wsHost}/ws/messaging?:X_Access_Token=${token}&:X_Type=2`;
try {
//微信websocket最大并发不能超过5个
//https://developers.weixin.qq.com/miniprogram/dev/framework/ability/network.html
if (count > 0) {
return ws;
}
clearInterval(timer);
ws = uni.connectSocket({
url: wsUrl,
complete: () => {}
});
count += 1;
uni.onSocketClose(function() {
socketOpen = false;
ws = undefined;
setTimeout(initWebSocket, 5000 * count);
});
uni.onSocketOpen(function() {
socketOpen = true;
});
uni.onSocketMessage(function(msg) {
var data = JSON.parse(msg.data);
if (data.type === 'error') {
uni.showToast({
title: data.message,
icon: "none",
duration: 3500
})
}
if (subs[data.requestId]) {
if (data.type === 'complete') {
subs[data.requestId].forEach(function(element) {
element.complete();
});;
} else if (data.type === 'result') {
subs[data.requestId].forEach(function(element) {
element.next(data);
});;
}
}
});
} catch (error) {
setTimeout(initWebSocket, 5000 * count);
}
timer = setInterval(function() {
try {
ws && ws.readyState === 1 ? sendSocketMessage(JSON.stringify({
"type": "ping"
})) : 0;
} catch (error) {
console.error(error, '发送心跳错误');
}
//同时判断
if (tempQueue.length > 0 && ws && ws.readyState === 1) {
sendSocketMessage(tempQueue[0], 1);
}
}, 2000);
return ws;
};
//flag,是否处理tempQueue中的数据,如果发送失败,则不会重新加入,发送成功,则去除
function sendSocketMessage(msg, flag) {
if (socketOpen) {
uni.sendSocketMessage({
data: msg
});
if (flag === 1) {
tempQueue.splice(0, 1);
}
} else {
if (flag != 1) {
tempQueue.push(msg);
}
}
}
const getWebsocket = (id, topic, parameter) => {
return Observable.create(function(observer) {
if (!subs[id]) {
subs[id] = [];
}
subs[id].push({
next: function(val) {
observer.next(val);
},
complete: function() {
observer.complete();
}
});
var msg = JSON.stringify({
id: id,
topic: topic,
parameter: parameter,
type: 'sub'
});
var thisWs = initWebSocket();
if (thisWs) {
try {
sendSocketMessage(msg);
} catch (error) {
initWebSocket();
uni.showToast({
title: 'websocket服务连接失败',
icon: "none",
duration: 3500
})
}
} else {
tempQueue.push(msg);
ws = undefined
count = 0
initWebSocket();
}
return function() {
console.log("这里")
var unsub = JSON.stringify({
id: id,
type: "unsub"
});
console.log(subs[id])
console.log(unsub)
delete subs[id];
if (thisWs) {
sendSocketMessage(unsub)
}
};
});
};
exports.getWebsocket = getWebsocket;
使用 websocket 的位置
// 初始化 WebSocket
initWebSocket() {
const app = this
const productId = 'DC-TOWER';
const deviceId = '2209001';
const groupId = '$TOWER_SENSOR';
// deviceOnlineStatus && deviceOnlineStatus.unsubscribe();
app.deviceOnlineStatus = getWebsocket(
`location-info-status-online-${deviceId}`,
`/device/${productId}/${deviceId}/status`,
).subscribe((resp) => {
const { payload } = resp;
if (resp.requestId === `location-info-status-${deviceId}`) {
}
});
app.propertyStatus = getWebsocket(
`location-info-message-property-${deviceId}`,
`/device/${productId}/${deviceId}/${groupId}/message/property/report`,
).subscribe((resp) => {
console.log(resp)
const { payload } = resp;
if (resp.requestId === `location-info-message-property-${deviceId}`) {
}
});
},
removeSubscribe() {
console.log("解绑了")
const app = this
console.log(app.deviceOnlineStatus)
console.log(app.propertyStatus)
if (app.deviceOnlineStatus) {
app.deviceOnlineStatus.unsubscribe();
}
if (app.propertyStatus) {
app.propertyStatus.unsubscribe();
}
},
运行结果及报错内容
在 initWebsocket 方法中,两次调用 GlobalWebsocket 中的 getWebsocket() ,应该会通过传递的参数,发送两次监听
但是现在在 sub 之后,直接自己 unsub 了,而且在 unsub 之后按说不该接收到对应的 payload ,但是还是接收到了(这里没有调用 unsubscribe,它自己执行了)
我的解答思路和尝试过的方法
现在的分析是 : websocket ,没有调用 unsub 结果它自己执行 unsub,调用的时候反而没执行
发送 sub 是通过 GlobalWebsocket 中的 getWebsocket() 中发送的
而 unsub 则应该是调用 getWebsocket 返回的对象的unsubscribe来执行的
但是在上面的代码当中没有调用过 unsubscribe ,它自己在 返回 completed 之后 unsub 了,而且 debugg 的时候,在 getWebsocket 中返回的函数中打了断点,断点没有卡上,不过当中的输出语句还正常输出,正常执行了 unsub
我想要达到的结果
在调用 getWebsocket() 之后,通过传递的参数,执行 send 方法,发送 sub 订阅,当不需要的时候通过调用 unsubscribe 来发送 unsub