hite404 2023-09-09 21:23 采纳率: 0%
浏览 0

Tkinter突然无错误显示退出窗口(周末马上结束:(

Tips:代码是我自己写的
主体状况描述:在运行时忽然程序卡住1秒钟左右,这时整个窗口是未响应的,然后窗口直接退出,没有任何报错。
详细再现:在结束前会有“was expecting XX bytes but got XX bytes before the socket closed”这段话,但是程序内已经写好了错误处理,应该不会出现问题的才对
周末马上结束,万分火急。

from _thread import start_new_thread
from PIL import Image, ImageTk
from base64 import b64decode
from time import time, sleep
from os.path import abspath
from copy import deepcopy
import tkinter.ttk as ttk
import tkinter as tk
import subprocess
import pyperclip
import socket
import struct
import data
import json


outputs = []
threads_num = 120
progress = 0
threads = 0
last_progress = 0
last_progresses = []


class ServerInfoGeter:
    def __init__(self) -> None:
        self.sock = None
        self.timeout = 0.5
        self.host = ""  # 主机名
        self.port = 0  # 端口

    def get_info(self, ip: str, port=25565) -> dict | bool:
        self.host = ip
        self.port = port

        # 创建连接
        server_address = (ip, port)
        try:
            self.sock = socket.create_connection(server_address, self.timeout)
        except TimeoutError:
            log(f"端口:{port}->错误: 端口无响应")
            return False

        # 发送握手数据包
        self.sock.send(self.make_handshake(ip.encode(), port))

        # 发送状态请求数据包
        self.sock.send(self.make_request())

        try:
            data = self.read_packet()
            response = self.read_json_response(data)

            if 'version' in response and 'name' in response['version']:
                return response
            else:
                log(f"端口:{port}->错误: 数据不可用")
                self.sock.close()
                return False
        except Exception as e:
            log(f"端口:{port}->错误: {e}")
            self.sock.close()
            return False
        finally:
            self.sock.close()

    def make_handshake(self, host: str, port: int) -> bytes:
        """创建一个握手包"""
        # 第一个字节是数据包ID
        # 下一个字节是协议版本
        # 下一个字节是主机字符串长度和字符串
        # 下两个字节是端口
        # 下一个字节是状态
        try:
            data = b'\x00' + b'\x00' + \
                struct.pack(">b", len(host)) + host + \
                struct.pack(">h", port-32768) + b'\x01'
        except struct.error:
            raise Exception("未知错误")

        return self.make_packet(data)

    def make_request(self) -> bytes:
        """前三个字节是数据包 ID,接下来的字节是数据"""
        return self.make_packet(b'\x00')

    def read_packet(self) -> bytes:
        """读取包"""
        # 读取数据包长度
        length = self.decode_varint(self.sock.recv(2))
        # 获取足够长的数据
        return self.recvall(length)

    def read_json_response(self, data: bytes) -> dict:
        """解码数据"""
        try:
            json_data = data[3:].decode('utf-8', errors='ignore')
            return json.loads(json_data)
        except UnicodeDecodeError:
            raise Exception("解码错误")
        except IndexError:
            raise Exception("数据过短")
        except json.JSONDecodeError:
            raise Exception("数据解码错误")

    def make_packet(self, data: bytes):
        """制作一个包"""
        return struct.pack(">I", len(data)) + data

    def decode_varint(self, data):
        """ 解析计算机在通信协议中用varint表示数值的方法 """
        number = 0
        shift = 0
        for raw_byte in data:
            val_byte = raw_byte & 0x7F
            number |= (val_byte << shift)
            if raw_byte & 0x80 == 0:
                break
            shift += 7
        return number

    def recvall(self, length):
        data = b''
        while len(data) < length:
            more = self.sock.recv(length - len(data))
            if not more:
                raise EOFError('was expecting %d bytes but got %d bytes before the socket closed' % (
                    length, len(data)))
            data += more
        return data


def log(s: str):
    loger.insert("end", s)
    if len(loger.keys()) > 50:
        loger.delete(1.0)
    loger.see("end")


def out_s(s: str):
    outcomes.insert("end", s)


def get_sub_values(dic: dict, key: str):
    result = []
    if isinstance(dic, dict):
        for k, v in dic.items():
            if k == key:
                if isinstance(v, list):
                    result.extend(v)
                else:
                    result.append(v)
            elif isinstance(v, dict):
                result.extend(get_sub_values(v, key))
            elif isinstance(v, list):
                for item in v:
                    if isinstance(item, dict):
                        result.extend(get_sub_values(item, key))
    return result


def find_keys(s: str, dic: dict):
    js = dic
    nums = s.split("-")
    for i in range(len(nums)):
        try:
            js = get_sub_values(js, nums[i])[0]
        except:
            return None
    return js


def check_server_available(host: str):
    out = subprocess.getoutput("ping " + host + " -n 1")
    if out.find("丢失 = 0") > 0:
        return True
    else:
        return False


def scanning_thread(host: str, ports: list[int]):
    global threads
    global progress
    scaner = ServerInfoGeter()
    process_json = {}
    out = None
    for port in ports:
        out = scaner.get_info(host, port)
        progress += 1
        if out == False:
            continue
        process_json = {
            "host": host,
            "port": port,
            "time": time(),
            "details": out}
        process_json["text"] = find_keys("description-text", process_json)
        process_json["maxplayer"] = find_keys("players-max", process_json)
        process_json["player"] = find_keys("players-online", process_json)
        process_json["version"] = find_keys("version-name", process_json)
        process_json["icon"] = find_keys("favicon", process_json)

        completions = ["text", "maxplayer", "version", "player"]
        for completion in completions:
            if process_json.get(completion) == None:
                process_json[completion] = "未知"

        if process_json.get("icon", "114514") == None:
            process_json["icon"] = data.data
        else:
            process_json["icon"] = process_json["icon"][22:]

        out_s(f"IP->{host}:{port},标题->{process_json['text']}")
        outputs.append(process_json)

    threads -= 1


def _port_scanning():
    global outputs
    start_scanningb.config(state=tk.DISABLED)
    loger.delete(0, "end")
    outcomes.delete(0, "end")
    outputs = []
    log("开始扫描")
    start_new_thread(port_scanning, ())


def port_scanning():
    global progress
    global threads_num
    host = host_input.get("1.0", "end-1c")
    port = port_input.get("1.0", "end-1c")
    threads_str = threads_input.get("1.0", "end-1c")

    try:  # 尝试将输入的端口转换为int类型
        if 65535 > int(port) > 0:  # 判断端口是否在0~65535之间
            scanning_thread(host, [int(port)])
            log("扫描结束")
            start_scanningb.config(state=tk.NORMAL)
            return
    except:
        pass

    try:  # 尝试将输入的线程数转换为int类型
        threads_num = int(threads_str)
    except:
        pass

    if not check_server_available(host):
        log("服务器不可用")
        start_scanningb.config(state=tk.NORMAL)
        log("扫描结束")
        return
    progress = 0
    progress_bar["value"] = 0
    temp = []
    threads = threads_num

    log("正在启动线程")
    for i in range(1, 65535+1):
        temp.append(i)
        if len(temp) >= 65535//threads_num:  # 判断是否达到线程数
            start_new_thread(scanning_thread, (host, deepcopy(temp)))
            temp.clear()
            sleep(0.1)
    start_new_thread(scanning_thread, (host, temp))
    log("线程启动完毕")
    sleep(0.5)
    global last_progresses
    global last_progress
    while progress < 65530:
        sleep(0.1)
        try:
            speed = sum(last_progresses)/len(last_progresses)
        except ZeroDivisionError:
            speed = 0
        speed = str(speed*10).split(".")[0]
        text4.config(text=f"已扫描:{progress}/65535\n速度:{speed}/s")
        progress_bar.config(value=(progress/65535)*100)
        progress_bar.update()
        last_progresses.append(progress-last_progress)
        if len(last_progresses) > 50:
            last_progresses.pop(0)
        last_progress = progress
    start_scanningb.config(state=tk.NORMAL)
    log("扫描结束")


def get_icon(base: bytes) -> tk.PhotoImage:
    data = b64decode(base)
    with open("temp.png", "wb") as f:  # 写入图片
        f.write(data)
    with Image.open("temp.png") as image:  # 打开图片
        image = image.resize((128, 128))
        tk_image = ImageTk.PhotoImage(image)
    return tk_image


def on_select(_):
    temp = outcomes.curselection()
    if not temp:
        return
    ordinal = temp[0]
    del temp
    infoer = tk.Toplevel(root)
    infoer.title("服务器信息")

    json_info = outputs[ordinal]
    host_and_port = f"{json_info['host']}:{json_info['port']}"
    try:
        a = json_info["text"]
        del a
    except:
        return

    try:
        icon = get_icon(json_info["icon"])
        tk.Label(infoer, image=icon).pack()
    except:
        pass

    def show_info(text: str, font: int):
        tk.Label(infoer, text=text,
                 font=("Microsoft YaHei UI", font)).pack()
    show_info(f"{json_info['text']}", 22)
    show_info(f"服务器IP: {host_and_port}", 10)
    show_info(f"服务器人数: {json_info['player']}", 10)
    show_info(f"服务器最大人数: {json_info['maxplayer']}", 10)
    show_info(f"服务器版本: {json_info['version']}", 10)

    copy = tk.Button(infoer, text="复制IP",
                     command=lambda: pyperclip.copy(host_and_port))

    copy.pack()
    infoer.mainloop()


root = tk.Tk()
root.title("服务器端口扫描&获取信息器")

host_text = tk.Label(text="服务器IP(网址)输入")
host_input = tk.Text(root, height=1, width=45)
port_text = tk.Label(text="服务器端口自定义输入")
port_input = tk.Text(root, height=1, width=45)
threads_text = tk.Label(text="线程数自定义输入")
threads_input = tk.Text(root, height=1, width=25)
start_scanningb = tk.Button(root, text="开始扫描", command=_port_scanning)
text3 = tk.Label(text="日志"+" "*82+"扫描结果"+" "*25)
loger = tk.Listbox(root, height=15, width=35)
outcomes = tk.Listbox(root, height=15, width=65)
scroll_bar = tk.Scrollbar(root, orient="vertical",
                          command=outcomes.yview)
progress_bar = ttk.Progressbar(
    root, orient="horizontal", length=500)
text4 = tk.Label(text=f"已扫描:{progress}/65535\n速度:0/s")

outcomes.bind("<<ListboxSelect>>", on_select)
host_input.insert(tk.END, "cn-jx-plc-1.openfrp.top")
port_input.insert(tk.END, "41949")
threads_input.insert(tk.END, str(threads_num))
outcomes.configure(yscrollcommand=scroll_bar.set)

host_text.pack()
host_input.pack()
port_text.pack()
port_input.pack()
threads_text.pack()
threads_input.pack()
start_scanningb.pack()
progress_bar.pack(side="bottom", fill="x")
text4.pack(side="bottom", fill="x")
text3.pack()
loger.pack(side="left", fill="y")
scroll_bar.pack(side="right", fill="y")
outcomes.pack(fill="x")

root.mainloop()
  • 写回答

1条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2023-09-10 09:35
    关注

    引用 皆我百晓生 小程序回复内容作答:
    根据您提供的代码,程序在运行时突然卡住1秒钟左右,然后窗口直接退出,并没有报错信息。这可能是由于以下原因导致的:

    1. 网络连接问题:程序连接到远程服务器获取信息时可能发生网络连接问题,导致程序卡住并退出。您可以尝试检查网络连接是否正常,或者尝试使用其他网络环境运行程序。

    2. 线程同步问题:程序使用多线程进行端口扫描,可能出现线程同步问题导致程序异常退出。您可以尝试在多线程操作中添加适当的同步机制,如锁或信号量,以确保线程安全。

    3. 异常处理问题:程序内已经写好了错误处理,但是可能存在未捕获的异常导致程序崩溃。您可以尝试在代码中添加更详细的异常处理,以便捕获和处理可能出现的异常。

    4. 内存溢出问题:程序在扫描大量端口时,可能会消耗大量内存导致程序崩溃。您可以尝试减少线程数或优化程序逻辑,以降低内存消耗。

    由于您提供的代码较长且涉及网络连接和多线程操作,具体原因可能需要更详细的调试和测试。建议您在代码中添加适当的日志输出,以便在程序运行过程中查看各个步骤的执行情况和可能出现的错误。您可以在关键位置添加日志输出语句,如在网络连接、数据处理和异常处理的地方。

    另外,您可以尝试排除其他可能的问题,如:

    1. 检查代码逻辑:确保程序的逻辑正确,没有遗漏或错误的地方。可以逐步调试代码,观察每一步的执行情况,以确定问题所在。

    2. 检查依赖库和环境:确保程序所依赖的库已经正确安装,并且环境配置正确。有时候缺少某个依赖库或环境配置不正确会导致程序运行异常。

    3. 尝试简化代码:如果可能,可以尝试简化代码,只保留关键部分,以确保程序的基本功能可以正常运行。这样可以帮助您确定问题出现的具体位置。

    最后,如果问题仍然存在,建议您通过逐步调试和排查来找到问题的具体原因。可以使用调试器或添加更多的日志输出来追踪程序的执行过程,以便快速定位问题。

    评论

报告相同问题?

问题事件

  • 创建了问题 9月9日

悬赏问题

  • ¥15 想用adb命令做一个通话软件,播放录音
  • ¥30 Pytorch深度学习服务器跑不通问题解决?
  • ¥15 部分客户订单定位有误的问题
  • ¥15 如何在maya程序中利用python编写领子和褶裥的模型的方法
  • ¥15 Linux权限管理相关操作(求解答)
  • ¥15 Bug traq 数据包 大概什么价
  • ¥15 在anaconda上pytorch和paddle paddle下载报错
  • ¥25 自动填写QQ腾讯文档收集表
  • ¥15 DbVisualizer Pro 12.0.7 sql commander光标错位 显示位置与实际不符
  • ¥15 android 打包报错