Tips:代码是我自己写的
主体状况描述:在运行时忽然程序卡住1秒钟左右,这时整个窗口是未响应的,然后窗口直接退出,没有任何报错。
详细再现:在结束前会有“was expecting XX bytes but got XX bytes before the socket closed”这段话,但是程序内已经写好了错误处理,应该不会出现问题的才对
周末马上结束,万分火急。
from _thread import start_new_thread
from PIL import Image, ImageTk
from base64 import b64decode
from time import time, sleep
from os.path import abspath
from copy import deepcopy
import tkinter.ttk as ttk
import tkinter as tk
import subprocess
import pyperclip
import socket
import struct
import data
import json
outputs = []
threads_num = 120
progress = 0
threads = 0
last_progress = 0
last_progresses = []
class ServerInfoGeter:
def __init__(self) -> None:
self.sock = None
self.timeout = 0.5
self.host = "" # 主机名
self.port = 0 # 端口
def get_info(self, ip: str, port=25565) -> dict | bool:
self.host = ip
self.port = port
# 创建连接
server_address = (ip, port)
try:
self.sock = socket.create_connection(server_address, self.timeout)
except TimeoutError:
log(f"端口:{port}->错误: 端口无响应")
return False
# 发送握手数据包
self.sock.send(self.make_handshake(ip.encode(), port))
# 发送状态请求数据包
self.sock.send(self.make_request())
try:
data = self.read_packet()
response = self.read_json_response(data)
if 'version' in response and 'name' in response['version']:
return response
else:
log(f"端口:{port}->错误: 数据不可用")
self.sock.close()
return False
except Exception as e:
log(f"端口:{port}->错误: {e}")
self.sock.close()
return False
finally:
self.sock.close()
def make_handshake(self, host: str, port: int) -> bytes:
"""创建一个握手包"""
# 第一个字节是数据包ID
# 下一个字节是协议版本
# 下一个字节是主机字符串长度和字符串
# 下两个字节是端口
# 下一个字节是状态
try:
data = b'\x00' + b'\x00' + \
struct.pack(">b", len(host)) + host + \
struct.pack(">h", port-32768) + b'\x01'
except struct.error:
raise Exception("未知错误")
return self.make_packet(data)
def make_request(self) -> bytes:
"""前三个字节是数据包 ID,接下来的字节是数据"""
return self.make_packet(b'\x00')
def read_packet(self) -> bytes:
"""读取包"""
# 读取数据包长度
length = self.decode_varint(self.sock.recv(2))
# 获取足够长的数据
return self.recvall(length)
def read_json_response(self, data: bytes) -> dict:
"""解码数据"""
try:
json_data = data[3:].decode('utf-8', errors='ignore')
return json.loads(json_data)
except UnicodeDecodeError:
raise Exception("解码错误")
except IndexError:
raise Exception("数据过短")
except json.JSONDecodeError:
raise Exception("数据解码错误")
def make_packet(self, data: bytes):
"""制作一个包"""
return struct.pack(">I", len(data)) + data
def decode_varint(self, data):
""" 解析计算机在通信协议中用varint表示数值的方法 """
number = 0
shift = 0
for raw_byte in data:
val_byte = raw_byte & 0x7F
number |= (val_byte << shift)
if raw_byte & 0x80 == 0:
break
shift += 7
return number
def recvall(self, length):
data = b''
while len(data) < length:
more = self.sock.recv(length - len(data))
if not more:
raise EOFError('was expecting %d bytes but got %d bytes before the socket closed' % (
length, len(data)))
data += more
return data
def log(s: str):
loger.insert("end", s)
if len(loger.keys()) > 50:
loger.delete(1.0)
loger.see("end")
def out_s(s: str):
outcomes.insert("end", s)
def get_sub_values(dic: dict, key: str):
result = []
if isinstance(dic, dict):
for k, v in dic.items():
if k == key:
if isinstance(v, list):
result.extend(v)
else:
result.append(v)
elif isinstance(v, dict):
result.extend(get_sub_values(v, key))
elif isinstance(v, list):
for item in v:
if isinstance(item, dict):
result.extend(get_sub_values(item, key))
return result
def find_keys(s: str, dic: dict):
js = dic
nums = s.split("-")
for i in range(len(nums)):
try:
js = get_sub_values(js, nums[i])[0]
except:
return None
return js
def check_server_available(host: str):
out = subprocess.getoutput("ping " + host + " -n 1")
if out.find("丢失 = 0") > 0:
return True
else:
return False
def scanning_thread(host: str, ports: list[int]):
global threads
global progress
scaner = ServerInfoGeter()
process_json = {}
out = None
for port in ports:
out = scaner.get_info(host, port)
progress += 1
if out == False:
continue
process_json = {
"host": host,
"port": port,
"time": time(),
"details": out}
process_json["text"] = find_keys("description-text", process_json)
process_json["maxplayer"] = find_keys("players-max", process_json)
process_json["player"] = find_keys("players-online", process_json)
process_json["version"] = find_keys("version-name", process_json)
process_json["icon"] = find_keys("favicon", process_json)
completions = ["text", "maxplayer", "version", "player"]
for completion in completions:
if process_json.get(completion) == None:
process_json[completion] = "未知"
if process_json.get("icon", "114514") == None:
process_json["icon"] = data.data
else:
process_json["icon"] = process_json["icon"][22:]
out_s(f"IP->{host}:{port},标题->{process_json['text']}")
outputs.append(process_json)
threads -= 1
def _port_scanning():
global outputs
start_scanningb.config(state=tk.DISABLED)
loger.delete(0, "end")
outcomes.delete(0, "end")
outputs = []
log("开始扫描")
start_new_thread(port_scanning, ())
def port_scanning():
global progress
global threads_num
host = host_input.get("1.0", "end-1c")
port = port_input.get("1.0", "end-1c")
threads_str = threads_input.get("1.0", "end-1c")
try: # 尝试将输入的端口转换为int类型
if 65535 > int(port) > 0: # 判断端口是否在0~65535之间
scanning_thread(host, [int(port)])
log("扫描结束")
start_scanningb.config(state=tk.NORMAL)
return
except:
pass
try: # 尝试将输入的线程数转换为int类型
threads_num = int(threads_str)
except:
pass
if not check_server_available(host):
log("服务器不可用")
start_scanningb.config(state=tk.NORMAL)
log("扫描结束")
return
progress = 0
progress_bar["value"] = 0
temp = []
threads = threads_num
log("正在启动线程")
for i in range(1, 65535+1):
temp.append(i)
if len(temp) >= 65535//threads_num: # 判断是否达到线程数
start_new_thread(scanning_thread, (host, deepcopy(temp)))
temp.clear()
sleep(0.1)
start_new_thread(scanning_thread, (host, temp))
log("线程启动完毕")
sleep(0.5)
global last_progresses
global last_progress
while progress < 65530:
sleep(0.1)
try:
speed = sum(last_progresses)/len(last_progresses)
except ZeroDivisionError:
speed = 0
speed = str(speed*10).split(".")[0]
text4.config(text=f"已扫描:{progress}/65535\n速度:{speed}/s")
progress_bar.config(value=(progress/65535)*100)
progress_bar.update()
last_progresses.append(progress-last_progress)
if len(last_progresses) > 50:
last_progresses.pop(0)
last_progress = progress
start_scanningb.config(state=tk.NORMAL)
log("扫描结束")
def get_icon(base: bytes) -> tk.PhotoImage:
data = b64decode(base)
with open("temp.png", "wb") as f: # 写入图片
f.write(data)
with Image.open("temp.png") as image: # 打开图片
image = image.resize((128, 128))
tk_image = ImageTk.PhotoImage(image)
return tk_image
def on_select(_):
temp = outcomes.curselection()
if not temp:
return
ordinal = temp[0]
del temp
infoer = tk.Toplevel(root)
infoer.title("服务器信息")
json_info = outputs[ordinal]
host_and_port = f"{json_info['host']}:{json_info['port']}"
try:
a = json_info["text"]
del a
except:
return
try:
icon = get_icon(json_info["icon"])
tk.Label(infoer, image=icon).pack()
except:
pass
def show_info(text: str, font: int):
tk.Label(infoer, text=text,
font=("Microsoft YaHei UI", font)).pack()
show_info(f"{json_info['text']}", 22)
show_info(f"服务器IP: {host_and_port}", 10)
show_info(f"服务器人数: {json_info['player']}", 10)
show_info(f"服务器最大人数: {json_info['maxplayer']}", 10)
show_info(f"服务器版本: {json_info['version']}", 10)
copy = tk.Button(infoer, text="复制IP",
command=lambda: pyperclip.copy(host_and_port))
copy.pack()
infoer.mainloop()
root = tk.Tk()
root.title("服务器端口扫描&获取信息器")
host_text = tk.Label(text="服务器IP(网址)输入")
host_input = tk.Text(root, height=1, width=45)
port_text = tk.Label(text="服务器端口自定义输入")
port_input = tk.Text(root, height=1, width=45)
threads_text = tk.Label(text="线程数自定义输入")
threads_input = tk.Text(root, height=1, width=25)
start_scanningb = tk.Button(root, text="开始扫描", command=_port_scanning)
text3 = tk.Label(text="日志"+" "*82+"扫描结果"+" "*25)
loger = tk.Listbox(root, height=15, width=35)
outcomes = tk.Listbox(root, height=15, width=65)
scroll_bar = tk.Scrollbar(root, orient="vertical",
command=outcomes.yview)
progress_bar = ttk.Progressbar(
root, orient="horizontal", length=500)
text4 = tk.Label(text=f"已扫描:{progress}/65535\n速度:0/s")
outcomes.bind("<<ListboxSelect>>", on_select)
host_input.insert(tk.END, "cn-jx-plc-1.openfrp.top")
port_input.insert(tk.END, "41949")
threads_input.insert(tk.END, str(threads_num))
outcomes.configure(yscrollcommand=scroll_bar.set)
host_text.pack()
host_input.pack()
port_text.pack()
port_input.pack()
threads_text.pack()
threads_input.pack()
start_scanningb.pack()
progress_bar.pack(side="bottom", fill="x")
text4.pack(side="bottom", fill="x")
text3.pack()
loger.pack(side="left", fill="y")
scroll_bar.pack(side="right", fill="y")
outcomes.pack(fill="x")
root.mainloop()