用Pygame写了一个贪吃蛇多人游戏,但是完全跑不起来。
代码如下。
服务端
import asyncio
import socket
import pickle
# 服务器地址和端口
SERVER_ADDRESS = '127.0.0.1'
SERVER_PORT = 2000
# 定义玩家ID计数器
player_id_counter = 1
# 初始化事件循环和服务器数据
loop = asyncio.get_event_loop()
game_state = {'player_snakes': {}}
# 辅助函数:处理客户端消息
def handle_client_message(client_socket, address):
global player_id_counter
try:
while True:
# 接收客户端消息
message = client_socket.recv(1024).decode()
if message == "GET_PLAYER_ID":
# 分配玩家ID
player_id = player_id_counter
player_id_counter += 1
# 发送玩家ID
client_socket.sendall(str(player_id).encode())
else:
# 更新玩家蛇的位置
player_snake = game_state['player_snakes'].get(player_id, [])
player_snake.clear()
# 解析消息并更新蛇位置
parts = message.split(",")
for i in range(0, len(parts) - 1, 2):
x = int(parts[i])
y = int(parts[i + 1])
player_snake.append((x, y))
# 序列化游戏状态并发送给客户端
serialized_game_state = pickle.dumps(game_state)
client_socket.sendall(serialized_game_state)
except (ConnectionResetError, ConnectionAbortedError):
# 处理客户端连接断开的情况
print(f"客户端 {address} 连接已关闭")
if player_id in game_state['player_snakes']:
del game_state['player_snakes'][player_id]
# 异步任务:监听客户端连接
async def listen_for_clients():
# 创建服务器Socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((SERVER_ADDRESS, SERVER_PORT))
server_socket.listen()
print("服务器已启动,等待客户端连接...")
while True:
# 接受客户端连接
client_socket, address = await loop.sock_accept(server_socket)
print(f"客户端 {address} 已连接")
# 创建任务以处理客户端消息
asyncio.create_task(handle_client_message(client_socket, address))
# 启动异步任务
loop.run_until_complete(listen_for_clients())
客户端
import asyncio
import socket
import pickle
import pygame
# 服务器地址和端口
SERVER_ADDRESS = '127.0.0.1'
SERVER_PORT = 2000
# 游戏区域大小
GRID_SIZE = 20
GRID_WIDTH = 30
GRID_HEIGHT = 30
# 蛇的移动速度
SNAKE_SPEED = 0.1
# 初始化Pygame
pygame.init()
# 创建游戏区域窗口
window_size = (GRID_WIDTH * GRID_SIZE, GRID_HEIGHT * GRID_SIZE)
window = pygame.display.set_mode(window_size)
pygame.display.set_caption('贪吃蛇')
# 定义颜色
WHITE = (255, 255, 255)
GREEN = (0, 255, 0)
# 辅助函数:绘制游戏区域
def draw_grid():
for x in range(0, window_size[0], GRID_SIZE):
pygame.draw.line(window, WHITE, (x, 0), (x, window_size[1]))
for y in range(0, window_size[1], GRID_SIZE):
pygame.draw.line(window, WHITE, (0, y), (window_size[0], y))
# 辅助函数:绘制蛇
def draw_snake(snake_positions):
for snake_part in snake_positions:
x, y = snake_part
pygame.draw.rect(window, GREEN, (x * GRID_SIZE, y * GRID_SIZE, GRID_SIZE, GRID_SIZE))
# 创建Socket连接
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 用于异步任务的函数
async def sync_game_state():
try:
# 连接到服务器
client_socket.connect((SERVER_ADDRESS, SERVER_PORT))
print("已连接到服务器")
# 发送获取玩家ID的请求
client_socket.sendall(b"GET_PLAYER_ID")
# 接收玩家ID
player_id = int(client_socket.recv(1024))
while True:
# 接收更新后的游戏状态
serialized_game_state = await loop.sock_recv(client_socket, 4096)
game_state = pickle.loads(serialized_game_state, encoding='latin1')
# 获取本玩家的蛇位置
player_snake = game_state['player_snakes'].get(player_id, [])
# 清空窗口
window.fill((0, 0, 0))
# 绘制游戏区域网格
draw_grid()
# 绘制蛇
draw_snake(player_snake)
# 更新显示
pygame.display.flip()
await asyncio.sleep(0.1)
except ConnectionRefusedError:
print("连接失败,正在尝试重新连接...")
# 绘制重新连接提示
font = pygame.font.Font(None, 36)
text = font.render("重新连接失败,正在尝试重新连接...", True, WHITE)
text_rect = text.get_rect(center=(window_size[0] // 2, window_size[1] // 2))
window.blit(text, text_rect)
pygame.display.flip()
await asyncio.sleep(1)
await sync_game_state()
# 异步任务的主函数
async def main():
await asyncio.gather(sync_game_state())
# 启动异步任务
pygame.display.set_mode(window_size)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
求解答