class ResourcesConsumer(AsyncWebsocketConsumer):
async def connect(self):
# 建立连接
print("开始连接")
await self.accept()
async def connect_to_remote_ws(self,text_data):
other_websocket_url = f"ws://{text_data}:8000"
try:
print("开始连接client")
# self.remote_websoket=await websockets.connect(other_websocket_url)
self.remote_websoket=await asyncio.wait_for(websockets.connect(other_websocket_url), timeout=5)
asyncio.create_task(self.forward_messages_from_remote())
except Exception as e:
print("连接报错",e)
await self.send(json.dumps({"error":f'连接到远程websoket失败:{str(e)}'}))
async def forward_messages_from_remote(self):
#持续监听远程websokets的消息并转发给本地客户端
try:
print("self.remote_websoket",self.remote_websoket)
async for message in self.remote_websoket:
await self.send(text_data=base64.b64encode(message).decode('utf-8'))
await asyncio.sleep(0.001)
except Exception as e:
print("远程websoket连接关闭1",e)
await self.remote_websoket.close()
except websockets.exceptions.ConnectionClosed:
print("远程websoket连接关闭")
await self.remote_websoket.close()
async def receive(self, text_data=None, bytes_data=None):
# 接收消息
try:
print("接收消息text",text_data)
print("接收消息byte",bytes_data)
aa=await self.connect_to_remote_ws(text_data)
print("aa",aa)
except Exception as e:
print("接收e",e)
async def disconnect(self, code):
# 关闭
print("断开连接")
self.close()
以上是我的drf 的websoket从远程客户端获取图片流的代码,想要做到,接收到参数(连接的远程ws主机ip),再去连接远程ws,只要我没主动断开就要一直传。目前的问题是,前端点击关闭后再次点击间隔时间长,有时未响应,连接上之后没达到我自动关闭的之间,自己就断开了
import win32gui
import cv2,time,datetime,os
import numpy as np
from PyQt5.QtWidgets import QApplication
import sys
import _thread
import asyncio
import websockets
import socket
from PIL import ImageGrab
import io
class ScreenCapture:
def __init__(self):
self.frame = None
self.b_frame = None
self.connected = False
def writetxt(self,string):
logpath=os.path.join('D:\\AOImonitor','log')
datpath = os.path.join(logpath,'result.log')
if not os.path.exists(logpath):
os.makedirs(logpath, exist_ok=True)
if not os.path.exists(datpath):
with open(datpath, 'w'):
pass
with open(datpath, "w", encoding='utf-8') as file:
file.write(string)
return '报错写入成功'
def writeerrlog(self,string):
logpath=os.path.join('D:\\AOImonitor','imageerrlog')
logtime = (datetime.datetime.now()).strftime('%Y-%m-%d %H:%M:%S')
datpath = os.path.join(logpath,'%s.log')%datetime.date.today()
if not os.path.exists(logpath):
os.makedirs(logpath, exist_ok=True)
if not os.path.exists(datpath):
with open(datpath, 'w'):
pass
with open(datpath, "a+", encoding='utf-8') as file:
file.write(logtime + ' ' + string + '\n')
return '报错写入成功'
def systemerrlog(self,string):
logpath=os.path.join('D:\\AOImonitor','systemerrlog')
logtime = (datetime.datetime.now()).strftime('%Y-%m-%d %H:%M:%S')
datpath = os.path.join(logpath,'%s.log')%datetime.date.today()
if not os.path.exists(logpath):
os.makedirs(logpath, exist_ok=True)
if not os.path.exists(datpath):
with open(datpath, 'w'):
pass
with open(datpath, "a+", encoding='utf-8') as file:
file.write(logtime + ' ' + string + '\n')
return '报错写入成功'
def open_camera(self):
try:
print("Opening camera...")
self.writeerrlog("进入传图启动函数")
self.writetxt("PASS")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 连接到一个公共的DNS服务器
sock.connect(('8.8.8.8', 80))
# 获取连接特定的DNS
CamIP = sock.getsockname()[0]
# 关闭socket连接
sock.close()
# CamIP = socket.gethostbyname(socket.gethostname())
print("CamIP",CamIP)
self.writeerrlog(f"当前连接IP :{CamIP}")
_thread.start_new_thread(self.run_server, (CamIP,))
self.writeerrlog("多线程运行传图程式")
except Exception as e:
self.systemerrlog("获取ip失败,请检查是否联网")
async def start_streaming(self, websocket, path):
self.connected = True
# 找到程序窗口
hwnd = win32gui.FindWindow(None, "Cable_AOI_3")
try:
while self.connected:
if hwnd:
# 获取电脑主屏幕
screen = QApplication.primaryScreen()
# 获取当前程序界面
rect=win32gui.GetWindowRect(hwnd)
screenShotImg=ImageGrab.grab(bbox=(rect[0]+14,rect[1],rect[2]-14,rect[3]-14))
imgByteArr = io.BytesIO()
screenShotImg.save(imgByteArr, format='JPEG')
self.frame = cv2.cvtColor(np.array(screenShotImg), cv2.COLOR_RGB2BGR)
if not np.array_equal(self.frame, self.b_frame):
self.b_frame = np.copy(self.frame)
_, img_encoded = cv2.imencode('.jpg', self.frame, [cv2.IMWRITE_JPEG_QUALITY, 50])
data = img_encoded.tobytes()
try:
await websocket.send(data)
except Exception as e:
print(e)
self.connected = False
self.systemerrlog("传图报错:"+str(e))
time.sleep(0.001)
except Exception as e:
self.connected = False
self.writetxt("FAIL")
self.systemerrlog("传图停止:"+str(e))
def run_server(self, CamIP):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(self.start_streaming, CamIP, 8000)
loop.run_until_complete(start_server)
loop.run_forever()
def stop_streaming(self):
self.connected = False
if __name__ == "__main__":
app = QApplication(sys.argv)
capture = ScreenCapture()
capture.open_camera()
sys.exit(app.exec_())
import asyncio
import websockets
import socket
import logging
from logging.handlers import TimedRotatingFileHandler
import sys
from collections import defaultdict
# 禁用websockets库的连接打开状态输出
logging.getLogger('websockets').setLevel(logging.ERROR)
# 配置日志记录
log_file = "server.log" # 日志文件路径
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 创建文件处理器,按日期切割日志文件
file_handler = TimedRotatingFileHandler(
log_file, when='midnight', backupCount=7)
file_handler.setLevel(logging.INFO)
# 创建终端处理器
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)
# 设置日志记录格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 添加处理器到日志记录器
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
# 定义ForwardServer类
class ForwardServer:
def __init__(self):
# 初始化
self.CamIP = socket.gethostbyname(socket.gethostname()) # 获取本机IP
self.Port = 9000
self.forward_websockets = {} # 存储client websocket连接
self.user_websockets = {} # 存儲user websocket连接
self.message_queues = {} # 存储消息队列
self.num_connections = {} # 存储连接数量
self.timeout_duration = 1 # 设置超时时间
# 定义连接超时函数
# async def connect_with_timeout(self, uri, timeout, websocket):
# try:
# # 尝试连接,如果超时则抛出异常
# connect_task = asyncio.create_task(websockets.connect(uri))
# return await asyncio.wait_for(connect_task, timeout)
# # except:
# # logging.error("Connect timeout...") # 记录超时信息
# # logging.info("-"*40) # 添加行分隔符
# # await websocket.send("Connect timeout...")
# finally:
# pass
async def connect_with_timeout(self, uri, timeout, websocket):
try:
# 尝试连接,如果超时则抛出异常
connect_task = websockets.connect(uri)
websocket = await asyncio.wait_for(connect_task, timeout)
return websocket
except asyncio.TimeoutError:
logging.error("Connect timeout...") # 记录超时信息
logging.info("-"*40) # 添加行分隔符
await websocket.send("Connect timeout...")
await websocket.close() # 关闭WebSocket连接
except websockets.exceptions.WebSocketException as e:
logging.error(f"WebSocket error occurred: {e}")
logging.info("-"*40)
await websocket.send(f"WebSocket error occurred: {e}")
await websocket.close() # 关闭WebSocket连接
except Exception as e:
logging.error(f"An error occurred: {e}")
logging.info("-"*40)
await websocket.send(f"An error occurred: {e}")
await websocket.close() # 关闭WebSocket连接
# 定义消息转发函数
async def forward_messages(self, websocket, path):
try:
# await asyncio.sleep(1)
client_id = await websocket.recv() # 获取客户端ID
print("client_id",client_id)
logging.info(f"Open IP: {websocket.remote_address[0]} connected: {client_id}")
ip=client_id.split(':')
print("ip",ip)
print("len(ip)",len(ip))
if len(ip)==2:
self.user_websockets[websocket] = ip[0]
if ip[0] not in self.num_connections:
self.num_connections[ip[0]]=[websocket.remote_address[0]]
else:
if websocket.remote_address[0] not in self.num_connections[ip[0]]:
self.num_connections[ip[0]].append(websocket.remote_address[0])# 增加连接数量
else:
pass
print('总数',len(self.num_connections.get(ip[0], 0)))
sendinfo=str(self.Port)+','+str(len(self.num_connections.get(ip[0], 0)))
await websocket.send(sendinfo)
else:
forward_uri = f"ws://{client_id}:8000" # 设置转发的URI
if client_id in self.forward_websockets: # 如果已经存在连接,则直接使用
forward_websocket = self.forward_websockets[client_id]
message_queue = self.message_queues[client_id]
if forward_websocket.open:
logging.info(f"Connection already exists for {client_id}")
else:
self.forward_websockets.pop(client_id)
if len(self.num_connections[client_id])==1:
self.num_connections.pop(client_id)
logging.info(f"Client closed for {client_id}")
else:
self.num_connections[client_id].remove(websocket.remote_address[0])
logging.info(f"Client closed for {websocket.remote_address[0]}")
else: # 如果不存在连接,则创建新的连接和消息队列
forward_websocket = await self.connect_with_timeout(forward_uri, self.timeout_duration, websocket)
if forward_websocket is None:
return
self.forward_websockets[client_id] = forward_websocket
message_queue = asyncio.Queue(maxsize=1)
self.message_queues[client_id] = message_queue
# self.num_connections[client_id] = 0
asyncio.create_task(self.forward_message_handler( websocket, client_id)) # 创建消息处理任务
self.user_websockets[websocket] = client_id
# self.num_connections[client_id] += 1 # 增加连接数量
# logging.info(f"Number of connections for {client_id}: {len(self.num_connections.get(client_id))}")
# logging.info("-"*40) # 添加行分隔符
while True: # 持续接收和转发消息
response = await message_queue.get()
await websocket.send(response) # 转发消息
await asyncio.sleep(0.001)
except Exception as e: # 处理异常
try:
logging.info(f"报错: {e}")
logging.info(f"Closed: {client_id}")
self.num_connections[client_id].remove(websocket.remote_address[0]) # 减少连接数量
if len(self.num_connections[client_id]) == 0: # 如果连接数量为0,则关闭连接和消息队列
forward_websocket = self.forward_websockets.pop(client_id)
await forward_websocket.close()
self.message_queues.pop(client_id)
self.num_connections.pop(client_id)
else:
logging.info(f"Number of connections for {client_id}: {self.num_connections.get(client_id, 0)}")
logging.info("-"*40) # 添加行分隔符
except:
pass
finally:
pass
# 定义消息处理函数
async def forward_message_handler(self, websocket, client_id):
try:
while True: # 持续接收和存储消息
response = await self.forward_websockets[client_id].recv()
# 直接替换队列中的数据项,丢弃旧的数据项
try:
self.message_queues[client_id].put_nowait(response)
except asyncio.QueueFull:
# 队列已满,丢弃旧的数据项
self.message_queues[client_id].get_nowait()
self.message_queues[client_id].put_nowait(response)
# print(self.message_queues[client_id].qsize())
await asyncio.sleep(0.001)
except Exception as e: # 处理异常
try:
if client_id in self.forward_websockets:
self.forward_websockets.pop(client_id)
self.num_connections.pop(client_id)
logging.info(f"Connections {client_id} Disconnect....")
logging.info(
f"Number of connections for {client_id}: {self.num_connections.get(client_id, 0)}")
logging.info("-"*40) # 添加行分隔符
except:
pass
async def user_task(self):
while True:
try:
# 初始化一个空字典用于存储是否存在有效的websocket连接
istrue = {}
# 当存在user websocket连接时
if len(self.user_websockets) > 0:
# 使用defaultdict对user_websockets进行分组
grouped_dict = defaultdict(list)
for key, value in self.user_websockets.items():
grouped_dict[value].append(key)
# 将分组后的defaultdict转换为普通字典
lists = dict(grouped_dict)
# 遍历每个客户端ID
for client_id in lists:
# 检查每个客户端ID对应的所有websocket连接是否打开
istrue[client_id] = [
websocket.open for websocket in lists[client_id]]
# 遍历每个客户端ID
for client_id in istrue:
# 检查是否所有的websocket连接都已关闭
all_false = all(
value == False for value in istrue[client_id])
if all_false and client_id in self.forward_websockets:
await self.forward_websockets[client_id].close()
self.message_queues.pop(client_id)
self.num_connections.pop(client_id)
# 刪除user_websockets的緩存數據
keys_to_delete = [
key for key, value in self.user_websockets.items() if value == client_id]
for key in keys_to_delete:
self.user_websockets.pop(key)
# 记录日志
logging.info("Task Except")
logging.info(
f"Connections {client_id} Disconnect....")
logging.info(
f"Number of connections for {client_id}: {self.num_connections.get(client_id, 0)}")
logging.info("-"*40) # 添加行分隔符
# 每次循环后暂停1秒
await asyncio.sleep(1)
except:
pass
# 创建服务器对象
server = ForwardServer()
# 创建并运行任务
async def run_tasks():
logging.info("----Server Started----")
logging.info("-----Version V0.2-----")
logging.info(f"Server listening on {server.CamIP}:{server.Port}")
logging.info("-"*40) # 添加行分隔符
start_server = websockets.serve(
server.forward_messages, server.CamIP, server.Port)
tasks = [start_server, server.user_task()]
await asyncio.gather(*tasks)
# 运行任务
asyncio.run(run_tasks())