sung_luo 2024-04-01 14:02 采纳率: 0%
浏览 7

drf websoket图片流不稳定


class ResourcesConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        # 建立连接
        print("开始连接")
        await self.accept()
    async def connect_to_remote_ws(self,text_data):
        other_websocket_url = f"ws://{text_data}:8000"
        try:
            print("开始连接client")
            # self.remote_websoket=await websockets.connect(other_websocket_url)
            self.remote_websoket=await asyncio.wait_for(websockets.connect(other_websocket_url), timeout=5)
            asyncio.create_task(self.forward_messages_from_remote())
        except Exception as e:
            print("连接报错",e)
            await self.send(json.dumps({"error":f'连接到远程websoket失败:{str(e)}'}))
    async def forward_messages_from_remote(self):
        #持续监听远程websokets的消息并转发给本地客户端
        try:
            print("self.remote_websoket",self.remote_websoket)
            async for message in self.remote_websoket:
                await self.send(text_data=base64.b64encode(message).decode('utf-8'))
                await asyncio.sleep(0.001)
        except Exception as e:
            print("远程websoket连接关闭1",e)
            await self.remote_websoket.close()
        except websockets.exceptions.ConnectionClosed:
            print("远程websoket连接关闭")
            await self.remote_websoket.close()
    async def receive(self, text_data=None, bytes_data=None):
        # 接收消息
        try:
            print("接收消息text",text_data)
            print("接收消息byte",bytes_data)
            aa=await self.connect_to_remote_ws(text_data)
            print("aa",aa)
        except Exception as e:
            print("接收e",e)

    async def disconnect(self, code):
        # 关闭
        print("断开连接")
        self.close()

以上是我的drf 的websoket从远程客户端获取图片流的代码,想要做到,接收到参数(连接的远程ws主机ip),再去连接远程ws,只要我没主动断开就要一直传。目前的问题是,前端点击关闭后再次点击间隔时间长,有时未响应,连接上之后没达到我自动关闭的之间,自己就断开了

import win32gui
import cv2,time,datetime,os
import numpy as np
from PyQt5.QtWidgets import QApplication
import sys
import _thread
import asyncio
import websockets
import socket
from PIL import ImageGrab
import io
class ScreenCapture:
    def __init__(self):
        self.frame = None
        self.b_frame = None
        self.connected = False
    def writetxt(self,string):
        logpath=os.path.join('D:\\AOImonitor','log')
        datpath = os.path.join(logpath,'result.log')
        if not os.path.exists(logpath):
            os.makedirs(logpath, exist_ok=True)
        if not os.path.exists(datpath):
            with open(datpath, 'w'):
                pass
        with open(datpath, "w", encoding='utf-8') as file:
            file.write(string)
        return '报错写入成功'
    def writeerrlog(self,string):
        logpath=os.path.join('D:\\AOImonitor','imageerrlog')
        logtime = (datetime.datetime.now()).strftime('%Y-%m-%d %H:%M:%S')
        datpath = os.path.join(logpath,'%s.log')%datetime.date.today()
        if not os.path.exists(logpath):
            os.makedirs(logpath, exist_ok=True)
        if not os.path.exists(datpath):
            with open(datpath, 'w'):
                pass
        with open(datpath, "a+", encoding='utf-8') as file:
            file.write(logtime + ' ' + string + '\n')
        return '报错写入成功'
    def systemerrlog(self,string):
        logpath=os.path.join('D:\\AOImonitor','systemerrlog')
        logtime = (datetime.datetime.now()).strftime('%Y-%m-%d %H:%M:%S')
        datpath = os.path.join(logpath,'%s.log')%datetime.date.today()
        if not os.path.exists(logpath):
            os.makedirs(logpath, exist_ok=True)
        if not os.path.exists(datpath):
            with open(datpath, 'w'):
                pass
        with open(datpath, "a+", encoding='utf-8') as file:
            file.write(logtime + ' ' + string + '\n')
        return '报错写入成功'
    def open_camera(self):
        try:
            print("Opening camera...")
            self.writeerrlog("进入传图启动函数")
            self.writetxt("PASS")
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            # 连接到一个公共的DNS服务器
            sock.connect(('8.8.8.8', 80))
            # 获取连接特定的DNS
            CamIP = sock.getsockname()[0]
            # 关闭socket连接
            sock.close()
            # CamIP = socket.gethostbyname(socket.gethostname())
            print("CamIP",CamIP)
            self.writeerrlog(f"当前连接IP :{CamIP}")
            _thread.start_new_thread(self.run_server, (CamIP,))
            self.writeerrlog("多线程运行传图程式")
        except Exception as e:
            self.systemerrlog("获取ip失败,请检查是否联网")
    async def start_streaming(self, websocket, path):
        self.connected = True
        # 找到程序窗口
        hwnd = win32gui.FindWindow(None, "Cable_AOI_3")
        try:
            while self.connected:
                if hwnd:
                    # 获取电脑主屏幕
                    screen = QApplication.primaryScreen()
                    # 获取当前程序界面
                    rect=win32gui.GetWindowRect(hwnd)
                    screenShotImg=ImageGrab.grab(bbox=(rect[0]+14,rect[1],rect[2]-14,rect[3]-14))
                    imgByteArr = io.BytesIO()
                    screenShotImg.save(imgByteArr, format='JPEG')
                    self.frame = cv2.cvtColor(np.array(screenShotImg), cv2.COLOR_RGB2BGR)
                    if not np.array_equal(self.frame, self.b_frame):
                        self.b_frame = np.copy(self.frame)
                        _, img_encoded = cv2.imencode('.jpg', self.frame, [cv2.IMWRITE_JPEG_QUALITY, 50])
                        data = img_encoded.tobytes()
                        try:
                            await websocket.send(data)
                        except Exception as e:
                            print(e)
                            self.connected = False
                            self.systemerrlog("传图报错:"+str(e))
                    time.sleep(0.001)
        except Exception as e:
            self.connected = False
            self.writetxt("FAIL")
            self.systemerrlog("传图停止:"+str(e))

    def run_server(self, CamIP):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        start_server = websockets.serve(self.start_streaming, CamIP, 8000)
        loop.run_until_complete(start_server)
        loop.run_forever()

    def stop_streaming(self):
        self.connected = False
if __name__ == "__main__":
    app = QApplication(sys.argv)
    capture = ScreenCapture()
    capture.open_camera()
    sys.exit(app.exec_())



import asyncio
import websockets
import socket
import logging
from logging.handlers import TimedRotatingFileHandler
import sys
from collections import defaultdict

# 禁用websockets库的连接打开状态输出
logging.getLogger('websockets').setLevel(logging.ERROR)
# 配置日志记录
log_file = "server.log"  # 日志文件路径
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# 创建文件处理器,按日期切割日志文件
file_handler = TimedRotatingFileHandler(
    log_file, when='midnight', backupCount=7)
file_handler.setLevel(logging.INFO)

# 创建终端处理器
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)

# 设置日志记录格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)

# 添加处理器到日志记录器
logger.addHandler(file_handler)
logger.addHandler(stream_handler)


# 定义ForwardServer类
class ForwardServer:
    def __init__(self):
        # 初始化
        self.CamIP = socket.gethostbyname(socket.gethostname())  # 获取本机IP
        self.Port = 9000
        self.forward_websockets = {}  # 存储client websocket连接
        self.user_websockets = {}  # 存儲user websocket连接
        self.message_queues = {}  # 存储消息队列
        self.num_connections = {}  # 存储连接数量
        self.timeout_duration = 1  # 设置超时时间

    # 定义连接超时函数
    # async def connect_with_timeout(self, uri, timeout, websocket):
    #     try:
    #         # 尝试连接,如果超时则抛出异常
    #         connect_task = asyncio.create_task(websockets.connect(uri))
    #         return await asyncio.wait_for(connect_task, timeout)
    #     # except:
    #     #     logging.error("Connect timeout...")  # 记录超时信息
    #     #     logging.info("-"*40)  # 添加行分隔符
    #     #     await websocket.send("Connect timeout...")
    #     finally:
    #         pass
    async def connect_with_timeout(self, uri, timeout, websocket):
        try:
            # 尝试连接,如果超时则抛出异常
            connect_task = websockets.connect(uri)
            websocket = await asyncio.wait_for(connect_task, timeout)
            return websocket
        except asyncio.TimeoutError:
            logging.error("Connect timeout...")  # 记录超时信息
            logging.info("-"*40)  # 添加行分隔符
            await websocket.send("Connect timeout...")
            await websocket.close()  # 关闭WebSocket连接
        except websockets.exceptions.WebSocketException as e:
            logging.error(f"WebSocket error occurred: {e}")
            logging.info("-"*40)
            await websocket.send(f"WebSocket error occurred: {e}")
            await websocket.close()  # 关闭WebSocket连接
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            logging.info("-"*40)
            await websocket.send(f"An error occurred: {e}")
            await websocket.close()  # 关闭WebSocket连接
    # 定义消息转发函数
    async def forward_messages(self, websocket, path):
        try:
            # await asyncio.sleep(1)
            client_id = await websocket.recv()  # 获取客户端ID
            print("client_id",client_id)
            logging.info(f"Open IP: {websocket.remote_address[0]} connected: {client_id}")
            ip=client_id.split(':')
            print("ip",ip)
            print("len(ip)",len(ip))
            if len(ip)==2:
                self.user_websockets[websocket] = ip[0]
                if ip[0] not in self.num_connections:
                    self.num_connections[ip[0]]=[websocket.remote_address[0]]
                else:
                    if websocket.remote_address[0] not in  self.num_connections[ip[0]]:
                        self.num_connections[ip[0]].append(websocket.remote_address[0])# 增加连接数量
                    else:
                        pass 
                print('总数',len(self.num_connections.get(ip[0], 0)))
                sendinfo=str(self.Port)+','+str(len(self.num_connections.get(ip[0], 0)))
                await websocket.send(sendinfo)
            else:
                forward_uri = f"ws://{client_id}:8000"  # 设置转发的URI
                if client_id in self.forward_websockets:  # 如果已经存在连接,则直接使用
                    forward_websocket = self.forward_websockets[client_id]
                    message_queue = self.message_queues[client_id]
                    if forward_websocket.open:
                        logging.info(f"Connection already exists for {client_id}")
                    else:
                        self.forward_websockets.pop(client_id)
                        if len(self.num_connections[client_id])==1:
                            self.num_connections.pop(client_id)
                            logging.info(f"Client closed for {client_id}")
                        else:
                            self.num_connections[client_id].remove(websocket.remote_address[0])
                            logging.info(f"Client closed for {websocket.remote_address[0]}")
                else:  # 如果不存在连接,则创建新的连接和消息队列
                    forward_websocket = await self.connect_with_timeout(forward_uri, self.timeout_duration, websocket)
                    if forward_websocket is None:
                        return
                    self.forward_websockets[client_id] = forward_websocket
                    message_queue = asyncio.Queue(maxsize=1)
                    self.message_queues[client_id] = message_queue
                    # self.num_connections[client_id] = 0
                    asyncio.create_task(self.forward_message_handler( websocket, client_id))  # 创建消息处理任务
                self.user_websockets[websocket] = client_id
                # self.num_connections[client_id] += 1  # 增加连接数量
                # logging.info(f"Number of connections for {client_id}: {len(self.num_connections.get(client_id))}")
                # logging.info("-"*40)  # 添加行分隔符
                while True:  # 持续接收和转发消息
                    response = await message_queue.get()
                    await websocket.send(response)  # 转发消息
                    await asyncio.sleep(0.001)
        except Exception as e:  # 处理异常
            try:
                logging.info(f"报错: {e}")
                logging.info(f"Closed: {client_id}")
                self.num_connections[client_id].remove(websocket.remote_address[0])  # 减少连接数量
                if len(self.num_connections[client_id]) == 0:  # 如果连接数量为0,则关闭连接和消息队列
                    forward_websocket = self.forward_websockets.pop(client_id)
                    await forward_websocket.close()
                    self.message_queues.pop(client_id)
                    self.num_connections.pop(client_id)
                else:
                    logging.info(f"Number of connections for {client_id}: {self.num_connections.get(client_id, 0)}")
                    logging.info("-"*40)  # 添加行分隔符
            except:
                pass
        finally:
            pass

    # 定义消息处理函数
    async def forward_message_handler(self, websocket, client_id):
        try:
            while True:  # 持续接收和存储消息
                response = await self.forward_websockets[client_id].recv()
                # 直接替换队列中的数据项,丢弃旧的数据项
                try:
                    self.message_queues[client_id].put_nowait(response)
                except asyncio.QueueFull:
                    # 队列已满,丢弃旧的数据项
                    self.message_queues[client_id].get_nowait()
                    self.message_queues[client_id].put_nowait(response)
                # print(self.message_queues[client_id].qsize())
                await asyncio.sleep(0.001)
        except Exception as e:  # 处理异常
            try:
                if client_id in self.forward_websockets:
                    self.forward_websockets.pop(client_id)
                    self.num_connections.pop(client_id)
                logging.info(f"Connections {client_id} Disconnect....")
                logging.info(
                    f"Number of connections for {client_id}: {self.num_connections.get(client_id, 0)}")
                logging.info("-"*40)  # 添加行分隔符
            except:
                pass

    async def user_task(self):
        while True:
            try:
                # 初始化一个空字典用于存储是否存在有效的websocket连接
                istrue = {}

                # 当存在user websocket连接时
                if len(self.user_websockets) > 0:
                    # 使用defaultdict对user_websockets进行分组
                    grouped_dict = defaultdict(list)
                    for key, value in self.user_websockets.items():
                        grouped_dict[value].append(key)

                    # 将分组后的defaultdict转换为普通字典
                    lists = dict(grouped_dict)

                    # 遍历每个客户端ID
                    for client_id in lists:
                        # 检查每个客户端ID对应的所有websocket连接是否打开
                        istrue[client_id] = [
                            websocket.open for websocket in lists[client_id]]

                    # 遍历每个客户端ID
                    for client_id in istrue:
                        # 检查是否所有的websocket连接都已关闭
                        all_false = all(
                            value == False for value in istrue[client_id])
                        if all_false and client_id in self.forward_websockets:
                            await self.forward_websockets[client_id].close()
                            self.message_queues.pop(client_id)
                            self.num_connections.pop(client_id)
                            # 刪除user_websockets的緩存數據
                            keys_to_delete = [
                                key for key, value in self.user_websockets.items() if value == client_id]
                            for key in keys_to_delete:
                                self.user_websockets.pop(key)
                            # 记录日志
                            logging.info("Task Except")
                            logging.info(
                                f"Connections {client_id} Disconnect....")
                            logging.info(
                                f"Number of connections for {client_id}: {self.num_connections.get(client_id, 0)}")
                            logging.info("-"*40)  # 添加行分隔符

                # 每次循环后暂停1秒
                await asyncio.sleep(1)
            except:
                pass


# 创建服务器对象
server = ForwardServer()

# 创建并运行任务


async def run_tasks():
    logging.info("----Server Started----")
    logging.info("-----Version V0.2-----")
    logging.info(f"Server listening on {server.CamIP}:{server.Port}")
    logging.info("-"*40)  # 添加行分隔符
    start_server = websockets.serve(
        server.forward_messages, server.CamIP, server.Port)
    tasks = [start_server, server.user_task()]
    await asyncio.gather(*tasks)


# 运行任务
asyncio.run(run_tasks())


  • 写回答

3条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-04-01 14:02
    关注

    引自免费微信小程序:皆我百晓生

    在处理Drf的websockets从远程客户端获取图片流时,以下是我建议的解决步骤和代码实现:

    1. 连接远程WS

      • 首先,创建一个ResourcesConsumer类,并添加必要的属性如URLWebSocketSession等用于配置连接参数和进行网络操作。
      class ResourcesConsumer(AsyncWebsocketConsumer):
          url = 'http://your-websocket-host:8000'
          ws_session = None
      
          @classmethod
          async def create(cls, connection_params):
              if cls.ws_session is None:
                  cls.ws_session = await websocket_connect(cls.url, *connection_params)
              return cls.ws_session
      
    2. 设置断点

      • connect_to_remote_ws()方法内,设置断点以跟踪远程websoket是否成功建立连接。
      @asyncio.coroutine
      def connect_to_remote_ws(text_data):
          try:
              self.ws_session = await websocket_connect(
                  URL=self.url,
                  ws_key=random_string(),
                  server_side=True,
                  heartbeat_interval=30,
                  connection_timeout=30,
                  method='GET',
                  protocols=['websocket'],
                  transports=[('websocket', 'text')],
                  options={'check_client_id': False}
              )
              print(f'Successful WebSocket connection established')
              yield from asyncio.sleep(0.001)  # 调试并检查发送延迟
          except Exception as e:
              print(f'Connection to remote websoket failed:', str(e))
              await self.ws_session.close()
              raise e
      
    3. 传递参数

      • forward_messages_from_remote()方法中,将文本数据和远程websoket上的消息作为参数传入。
      @asyncio.coroutine
      def forward_messages_from_remote(text_data, message):
          print(f'Receiving messages from remote websoket: {message}')
          while True:
              data = await self.ws_session.recv()
              if not data:
                  break
              message_data = base64.b64decode(data).decode('utf-8')
              await asyncio.sleep(0.001)
              await self.ws_session.send(json.dumps({'message': message_data}))
      
    4. 断开连接

      • 使用close()方法关闭本地websocket连接。
      @asyncio.coroutine
      def disconnect(code):
          if code == 1:
              print('Server closed gracefully')
              self.ws_session.close()
              yield from asyncio.sleep(0.001)
          else:
              print('Client disconnected abruptly', code)
              self.ws_session.close()
              raise Exception(f'Disconnected code {code}')
      
    5. 绘制流程图

      • 使用Python中的graphviz库创建流程图,以便可视化应用程序的运行状态和通信过程。
      import graphviz as gv
      
      with open('websockets_flow.png', 'w') as f:
          fig = gv.Figure(figsize=(10, 6))
          fig.add_node('RemoteWebsoket')
          fig.add_node('ResourceConsumer')
          fig.add_edge('RemoteWebsoket', 'ResourceConsumer')
      
          flow = [['Start'], ['ResourcesConsumer Connect', 'Server Side WebSocket Connection'], ['Forward Messages From Remote Websoket', 'Sending message to Client'], ['ResourceConsumer Receive', 'Receiving messages from Client'], ['Client Disconnect', 'Client Disconnected'], ['End']]
          g = gv.SVG(f, layout=dict(directed=False))
          g.node['RemoteWebsoket'].pos = (200, 150)
          g.node['ResourceConsumer'].pos = (200, 200)
          g.edge['RemoteWebsoket']['ResourceConsumer'].pos = (190, 200)
          g.edge['ResourceConsumer']['RemoteWebsoket'].pos = (190, 150)
          for edge in flow:
              g.edge[edge[0]][edge[1]].pos = (200, 150 + edge[1])
      
          g.render(f)
          f.seek(0)
      

      这段代码会在文件名为websockets_flow.png的路径下创建一个表示WebSockets通信流程的图形。

    这个解决方案的思路是在远程websockets服务器与资源consumer之间建立一个异步连接,当有新的请求到来时,根据协议类型和远程server端的配置(包括ws_keyheartbeat_intervalconnection_timeoutmethodprotocols)来初始化WebSocket连接,并在响应客户端请求之前,通过send()函数向服务器推送一系列的转发消息,直到与本地client建立有效的连接为止。在这个过程中,我们遵循了WebSocket编程规范,包括使用ws_key字典键保存连接信息、在每次心跳周期中尝试连接、在连接失败时立即关闭连接以及在断开连接时发出通知和清除存储在WebSocket连接上的信息。这将确保图像流始终保持稳定,即使当前端界面被关闭或用户重新打开浏览器窗口时,也能保持对远程ws服务器的连接状态更新。

    注意:以上代码仅为示例,具体实现可能因服务器端的具体架构和环境而有所不同。例如,你需要根据实际需求调整random_string()函数的实现方式、server_side=Truetransport=[('websocket', 'text')]等参数值,同时根据可能存在的并发请求和限制性能情况,优化重连策略以提高程序的稳定性。在编写代码时,还需要考虑可能出现的数据同步、异常处理等问题,确保代码具有良好的可扩展性和可维护性。

    评论 编辑记录

报告相同问题?

问题事件

  • 修改了问题 4月2日
  • 修改了问题 4月1日
  • 创建了问题 4月1日

悬赏问题

  • ¥15 office打开卡退(新电脑重装office系统后)
  • ¥300 FLUENT 火箭发动机燃烧EDC仿真
  • ¥15 【Hadoop 问题】Hadoop编译所遇问题hadoop-common: make failed with error code 2
  • ¥15 vb6.0+webbrowser无法加载某个网页求解
  • ¥15 RPA财务机器人采购付款流程
  • ¥15 计算机图形多边形及三次样条曲线绘制
  • ¥15 根据protues画的图用keil写程序
  • ¥200 如何使用postGis实现最短领规划?
  • ¥15 pyinstaller打包错误
  • ¥20 cesm的气溶胶排放文件