服务器端代码
import asyncio
import websockets
# 初始化游戏
board = [" " for _ in range(9)] # 初始为空格
current_player = "X" # 当前玩家
clients = {} # 存储客户端连接
# 处理客户端连接
async def handle_client(websocket, path):
print("Client connected")
try:
global current_player
player = current_player
clients[player] = websocket
current_player = "O" if current_player == "X" else "X"
await websocket.send("You are player " + player)
await broadcast_board()
async for message in websocket:
await handle_message(player, message)
except websockets.exceptions.ConnectionClosed:
pass
except Exception as e:
print("An error occurred:", e, flush=True)
finally:
del clients[player]
# 处理客户端发送的消息
async def handle_message(player, message):
if message.startswith("move"):
_, _, index = message.split()
index = int(index)
if board[index] == " " and player == current_player:
board[index] = player
await broadcast_board()
if check_winner():
await broadcast_message(f"move {player} {index}")
await broadcast_message(f"win {player}")
await reset_board()
elif " " not in board:
await broadcast_message("draw")
await reset_board()
else:
await broadcast_message(f"move {player} {index}")
# 广播游戏板给所有客户端
async def broadcast_board():
board_message = "board " + " ".join(board)
await asyncio.gather(*[client.send(board_message) for client in clients.values()])
# 广播消息给所有客户端
async def broadcast_message(message):
await asyncio.gather(*[client.send(message) for client in clients.values()])
# 检查游戏是否结束
def check_winner():
# 检查行、列和对角线
for i in range(3):
if board[i] == board[i + 3] == board[i + 6] != " ":
return True
if board[i * 3] == board[i * 3 + 1] == board[i * 3 + 2] != " ":
return True
if board[0] == board[4] == board[8] != " " or board[2] == board[4] == board[6] != " ":
return True
return False
# 重置游戏板
async def reset_board():
global board, current_player
board = [" " for _ in range(9)]
current_player = "X"
await broadcast_board()
# 启动WebSocket服务器
start_server = websockets.serve(handle_client, "localhost", 8888)
print("WebSocket server is running on ws://localhost:8888")
asyncio.get_event_loop().run_forever()
客户端代码
import tkinter as tk
from tkinter import messagebox
import asyncio
import websockets
import threading
import concurrent.futures
# 初始化游戏
root = tk.Tk()
root.title("井字游戏")
# 创建井字游戏板
board = [" " for _ in range(9)] # 初始为空格
current_player = "X" # 当前玩家
player_X_score = 0
player_O_score = 0
game_mode = "双人游戏"
# WebSocket服务器地址
websocket_server = "ws://localhost:8888/ws" # 根据实际地址设置
# 检查游戏是否结束
def check_winner():
# 检查行、列和对角线
for i in range(3):
if board[i] == board[i + 3] == board[i + 6] != " ":
return True
if board[i * 3] == board[i * 3 + 1] == board[i * 3 + 2] != " ":
return True
if board[0] == board[4] == board[8] != " " or board[2] == board[4] == board[6] != " ":
return True
return False
# 异步函数,处理服务器或其他玩家发送的消息
async def handle_message(message):
global board, current_player, player_X_score, player_O_score
if message.startswith("move"):
_, player, index = message.split()
index = int(index)
board[index] = player
buttons[index].config(text=player)
if check_winner():
if player == "X":
player_X_score += 1
else:
player_O_score += 1
update_score()
await show_game_result(f"玩家 {player} 赢了!")
elif " " not in board:
await show_game_result("平局!")
else:
current_player = "O" if player == "X" else "X"
# 异步函数,弹出游戏结果信息
async def show_game_result(message):
messagebox.showinfo("游戏结束", message)
# 处理玩家点击事件
async def handle_click(index):
global current_player, game_mode, player_X_score, player_O_score
if board[index] == " " and not check_winner():
board[index] = current_player
buttons[index].config(text=current_player)
if check_winner():
if current_player == "X":
player_X_score += 1
else:
player_O_score += 1
update_score()
await show_game_result(f"玩家 {current_player} 赢了!")
elif " " not in board:
await show_game_result("平局!")
else:
current_player = "O" if current_player == "X" else "X"
if game_mode == "单人游戏" and not check_winner():
await computer_move()
move_message = f"move {current_player} {index}"
await send_message(move_message) # 使用await调用send_message函数
# 发送消息到服务器
async def send_message(message):
uri = websocket_server
async with websockets.connect(uri) as websocket:
await websocket.send(message)
# 计算机下棋(简单随机选择空白位置)
def computer_move():
import random
empty_cells = [i for i, cell in enumerate(board) if cell == " "]
if empty_cells:
random_index = random.choice(empty_cells)
handle_click(random_index)
# 更新分数显示
def update_score():
player_X_score_label.config(text=f"玩家 X 分数: {player_X_score}")
player_O_score_label.config(text=f"玩家 O 分数: {player_O_score}")
# 切换游戏模式
def change_game_mode():
global game_mode
game_mode = game_mode_var.get()
messagebox.showinfo("切换游戏模式", f"已切换到{game_mode}。")
reset_board()
# 重新开始游戏
def reset_board():
global board, current_player
board = [" " for _ in range(9)]
current_player = "X"
for button in buttons:
button.config(text=" ")
update_score()
# 创建按钮
buttons = []
for i in range(9):
button = tk.Button(root, text=" ", font=("Arial", 24), width=3, height=1,
command=lambda i=i: handle_click(i)) # 在线程中运行异步函数
button.grid(row=i // 3, column=i % 3)
buttons.append(button)
# 游戏模式选择
game_mode_var = tk.StringVar()
game_mode_var.set("双人游戏")
game_mode_label = tk.Label(root, text="选择游戏模式:")
game_mode_label.grid(row=3, column=0, columnspan=2)
single_player_button = tk.Radiobutton(root, text="单人游戏", variable=game_mode_var, value="单人游戏", command=change_game_mode)
single_player_button.grid(row=3, column=2)
two_player_button = tk.Radiobutton(root, text="双人游戏", variable=game_mode_var, value="双人游戏", command=change_game_mode)
two_player_button.grid(row=3, column=3)
# 分数显示
player_X_score_label = tk.Label(root, text=f"玩家 X 分数: {player_X_score}")
player_X_score_label.grid(row=4, column=0, columnspan=2)
player_O_score_label = tk.Label(root, text=f"玩家 O 分数: {player_O_score}")
player_O_score_label.grid(row=4, column=2, columnspan=2)
async def connect_to_server():
uri = websocket_server
async with websockets.connect(uri) as websocket:
while True: # 不再需要退出条件
message = await websocket.recv()
await handle_message(message)
async def main():
loop = asyncio.get_event_loop()
# 异步运行 connect_to_server(),不会阻塞事件循环
asyncio.create_task(connect_to_server())
root.mainloop()
if __name__ == "__main__":
asyncio.run(main())
怎样修改代码让我的客户端能够正常运行