‖热风海陆 2023-09-01 16:00 采纳率: 0%
浏览 25
已结题

如何让异步同步函数都能运行

服务器端代码

import asyncio
import websockets



# 初始化游戏
board = [" " for _ in range(9)]  # 初始为空格
current_player = "X"  # 当前玩家
clients = {}  # 存储客户端连接


# 处理客户端连接

async def handle_client(websocket, path):
    print("Client connected")
    try:
        global current_player
        player = current_player
        clients[player] = websocket
        current_player = "O" if current_player == "X" else "X"

  
        await websocket.send("You are player " + player)
        await broadcast_board()
        
        async for message in websocket:
            await handle_message(player, message)

    except websockets.exceptions.ConnectionClosed:
        pass
    except Exception as e:
        print("An error occurred:", e, flush=True)
    finally:
        del clients[player]


# 处理客户端发送的消息
async def handle_message(player, message):
    if message.startswith("move"):
        _, _, index = message.split()
        index = int(index)
        
        if board[index] == " " and player == current_player:
            board[index] = player
            await broadcast_board()

            if check_winner():
                await broadcast_message(f"move {player} {index}")
                await broadcast_message(f"win {player}")
                await reset_board()
            elif " " not in board:
                await broadcast_message("draw")
                await reset_board()
            else:
                await broadcast_message(f"move {player} {index}")

# 广播游戏板给所有客户端
async def broadcast_board():
    board_message = "board " + " ".join(board)
    await asyncio.gather(*[client.send(board_message) for client in clients.values()])

# 广播消息给所有客户端
async def broadcast_message(message):
    await asyncio.gather(*[client.send(message) for client in clients.values()])

# 检查游戏是否结束
def check_winner():
    # 检查行、列和对角线
    for i in range(3):
        if board[i] == board[i + 3] == board[i + 6] != " ":
            return True
        if board[i * 3] == board[i * 3 + 1] == board[i * 3 + 2] != " ":
            return True
    if board[0] == board[4] == board[8] != " " or board[2] == board[4] == board[6] != " ":
        return True
    return False

# 重置游戏板
async def reset_board():
    global board, current_player
    board = [" " for _ in range(9)]
    current_player = "X"
    await broadcast_board()

# 启动WebSocket服务器

start_server = websockets.serve(handle_client, "localhost", 8888)
print("WebSocket server is running on ws://localhost:8888")
asyncio.get_event_loop().run_forever()



客户端代码

import tkinter as tk
from tkinter import messagebox
import asyncio
import websockets
import threading
import concurrent.futures


# 初始化游戏
root = tk.Tk()
root.title("井字游戏")

# 创建井字游戏板
board = [" " for _ in range(9)]  # 初始为空格
current_player = "X"  # 当前玩家
player_X_score = 0
player_O_score = 0
game_mode = "双人游戏"

# WebSocket服务器地址
websocket_server = "ws://localhost:8888/ws"  # 根据实际地址设置



# 检查游戏是否结束
def check_winner():
    # 检查行、列和对角线
    for i in range(3):
        if board[i] == board[i + 3] == board[i + 6] != " ":
            return True
        if board[i * 3] == board[i * 3 + 1] == board[i * 3 + 2] != " ":
            return True
    if board[0] == board[4] == board[8] != " " or board[2] == board[4] == board[6] != " ":
        return True
    return False

# 异步函数,处理服务器或其他玩家发送的消息
async def handle_message(message):
    global board, current_player, player_X_score, player_O_score

    if message.startswith("move"):
        _, player, index = message.split()
        index = int(index)
        board[index] = player
        buttons[index].config(text=player)
        if check_winner():
            if player == "X":
                player_X_score += 1
            else:
                player_O_score += 1
            update_score()
            await show_game_result(f"玩家 {player} 赢了!")
        elif " " not in board:
            await show_game_result("平局!")
        else:
            current_player = "O" if player == "X" else "X"

# 异步函数,弹出游戏结果信息
async def show_game_result(message):
    messagebox.showinfo("游戏结束", message)
    
# 处理玩家点击事件
async def handle_click(index):
    global current_player, game_mode, player_X_score, player_O_score

    if board[index] == " " and not check_winner():
        board[index] = current_player
        buttons[index].config(text=current_player)
        if check_winner():
            if current_player == "X":
                player_X_score += 1
            else:
                player_O_score += 1
            update_score()
            await show_game_result(f"玩家 {current_player} 赢了!")
        elif " " not in board:
            await show_game_result("平局!")
        else:
            current_player = "O" if current_player == "X" else "X"
            if game_mode == "单人游戏" and not check_winner():
                await computer_move()

        move_message = f"move {current_player} {index}"
        await send_message(move_message)  # 使用await调用send_message函数



# 发送消息到服务器
async def send_message(message):
    uri = websocket_server
    async with websockets.connect(uri) as websocket:
        await websocket.send(message)

# 计算机下棋(简单随机选择空白位置)
def computer_move():
    import random
    empty_cells = [i for i, cell in enumerate(board) if cell == " "]
    if empty_cells:
        random_index = random.choice(empty_cells)
        handle_click(random_index)

# 更新分数显示
def update_score():
    player_X_score_label.config(text=f"玩家 X 分数: {player_X_score}")
    player_O_score_label.config(text=f"玩家 O 分数: {player_O_score}")

# 切换游戏模式
def change_game_mode():
    global game_mode
    game_mode = game_mode_var.get()
    messagebox.showinfo("切换游戏模式", f"已切换到{game_mode}。")
    reset_board()

# 重新开始游戏
def reset_board():
    global board, current_player
    board = [" " for _ in range(9)]
    current_player = "X"
    for button in buttons:
        button.config(text=" ")
    update_score()

# 创建按钮
buttons = []
for i in range(9):
    button = tk.Button(root, text=" ", font=("Arial", 24), width=3, height=1,
                       command=lambda i=i: handle_click(i))  # 在线程中运行异步函数
    button.grid(row=i // 3, column=i % 3)
    buttons.append(button)



# 游戏模式选择
game_mode_var = tk.StringVar()
game_mode_var.set("双人游戏")
game_mode_label = tk.Label(root, text="选择游戏模式:")
game_mode_label.grid(row=3, column=0, columnspan=2)
single_player_button = tk.Radiobutton(root, text="单人游戏", variable=game_mode_var, value="单人游戏", command=change_game_mode)
single_player_button.grid(row=3, column=2)
two_player_button = tk.Radiobutton(root, text="双人游戏", variable=game_mode_var, value="双人游戏", command=change_game_mode)
two_player_button.grid(row=3, column=3)

# 分数显示
player_X_score_label = tk.Label(root, text=f"玩家 X 分数: {player_X_score}")
player_X_score_label.grid(row=4, column=0, columnspan=2)
player_O_score_label = tk.Label(root, text=f"玩家 O 分数: {player_O_score}")
player_O_score_label.grid(row=4, column=2, columnspan=2)

async def connect_to_server():
    uri = websocket_server
    async with websockets.connect(uri) as websocket:
        while True:  # 不再需要退出条件
            message = await websocket.recv()
            await handle_message(message)



async def main():
    loop = asyncio.get_event_loop()
    
    # 异步运行 connect_to_server(),不会阻塞事件循环
    asyncio.create_task(connect_to_server())
    
    root.mainloop()

if __name__ == "__main__":
    asyncio.run(main())


怎样修改代码让我的客户端能够正常运行

  • 写回答

12条回答 默认 最新

  • bug菌¹ 优质创作者: Java、算法与数据结构技术领域 2023-09-03 09:35
    关注
    获得1.05元问题酬金

    题主,这个问题我来替你解决(参考结合AI智能、文心一言),若有帮助,还望采纳,点击回答右侧采纳即可。


    可以使用协程(coroutine)来实现。协程是一种比线程更轻量级的并发执行机制,可以在同一线程内实现多个任务的切换执行,从而实现异步和同步函数的并发执行。

    在 Python 中,可以使用 asyncio 模块来实现协程并发执行。通过 asyncio 的提供的 event loop(事件循环)机制,我们可以将多个异步任务注册到事件循环中,并且在需要的时候,通过 await 关键字来切换执行不同的任务。

    同时,我们可以使用 asyncio 同步函数来包装原本的同步函数,从而将其转化为可以在协程中执行的异步函数。例如,可以使用 asyncio 的 run_in_executor 函数来将同步函数在一个线程池中执行,从而避免阻塞事件循环。

    总之,通过协程的机制,可以很方便地实现异步和同步函数的并发执行。

    评论

报告相同问题?

问题事件

  • 系统已结题 9月9日
  • 赞助了问题酬金15元 9月1日
  • 创建了问题 9月1日