雪夜无影 2024-10-15 22:24 采纳率: 0%
浏览 6
问题最晚将于10月23日00:00点结题

Pytorch深度学习服务器跑不通问题解决?

正在利用深度学习来进行蛋白质序列的预测。因为数据集比较大,所以在服务器上跑。取了数据集中一小部分跑起来没问题,但是完整数据集在服务器上调用两个节点就会出现分布式环境初始化的报错。请问是出了什么问题?(改了很多次代码,出现不同的问题:OOM/NCCL通信的问题等;服务器用的北京超算云计算)模型的配置不能变


import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
import random
import numpy as np
import os
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from transformers import AutoTokenizer
from ttt import TTTForCausalLM, TTTConfig, TTT_STANDARD_CONFIGS
from torch import amp  # 混合精度训练
from sklearn.model_selection import train_test_split

# 设置随机种子
def setup_seed(seed):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

# 初始化分布式环境
def setup_distributed():
    dist.init_process_group(backend='nccl', init_method='env://')
    rank = dist.get_rank()  # 获取全局 rank
    world_size = dist.get_world_size()  # 获取全局 world_size
    local_rank = rank % torch.cuda.device_count()  # 计算每个节点的局部 GPU rank
    torch.cuda.set_device(local_rank)  # 设置 GPU 设备为 local_rank
    setup_seed(0)  # 所有进程使用相同的种子
    return rank, world_size, local_rank

rank, world_size, local_rank = setup_distributed()
device = torch.device("cuda", local_rank)

PAD_TOKEN = '[PAD]'
SOS_TOKEN = '[SOS]'
EOS_TOKEN = '[EOS]'
UNK_TOKEN = '[UNK]'

# 所有进程都读取和处理数据
with open('preprocessed_sequences.txt', 'r') as f:
    sequences = [line.strip() for line in f.readlines()]
# 过滤掉空序列
sequences = [seq for seq in sequences if seq]
print(f"Rank {rank}: 共加载了 {len(sequences)} 条序列。")

# 仅在主进程中构建词汇表
if rank == 0:
    amino_acids = set(''.join(sequences))
    amino_acids = sorted(list(amino_acids))
    print(f"氨基酸种类:{amino_acids}")

    vocab = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN] + amino_acids
    vocab_to_idx = {token: idx for idx, token in enumerate(vocab)}
    idx_to_vocab = {idx: token for idx, token in enumerate(vocab)}
    vocab_size = len(vocab)
    print(f"词汇表大小:{vocab_size}")
else:
    amino_acids = None
    vocab = None
    vocab_to_idx = None
    idx_to_vocab = None
    vocab_size = None

# 广播词汇表到所有进程
amino_acids = [amino_acids]
vocab = [vocab]
vocab_to_idx = [vocab_to_idx]
idx_to_vocab = [idx_to_vocab]
vocab_size = [vocab_size]

dist.broadcast_object_list(amino_acids, src=0)
dist.broadcast_object_list(vocab, src=0)
dist.broadcast_object_list(vocab_to_idx, src=0)
dist.broadcast_object_list(idx_to_vocab, src=0)
dist.broadcast_object_list(vocab_size, src=0)

amino_acids = amino_acids[0]
vocab = vocab[0]
vocab_to_idx = vocab_to_idx[0]
idx_to_vocab = idx_to_vocab[0]
vocab_size = vocab_size[0]

# 定义编码函数
def encode_sequence(seq):
    return [vocab_to_idx.get(char, vocab_to_idx[UNK_TOKEN]) for char in seq]

# 编码所有序列
encoded_sequences = [encode_sequence(seq) for seq in sequences]

# 使用相同的随机种子确保数据划分一致
indices = np.arange(len(encoded_sequences))
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)

train_seqs_full = [encoded_sequences[i] for i in train_indices]
val_seqs_full = [encoded_sequences[i] for i in val_indices]

chunk_size = 1000  # 每个子集包含的序列数

# 计算子集数量
num_chunks = (len(train_seqs_full) + chunk_size - 1) // chunk_size
print(f"Rank {rank}: 训练集将被分成 {num_chunks} 个子集。")

# 自定义数据集类
class ProteinDataset(Dataset):
    def __init__(self, sequences, max_length=2048):
        self.sequences = sequences
        self.max_length = max_length

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq = self.sequences[idx]
        max_seq_len = random.randint(100, self.max_length - 2)
        seq = seq[:max_seq_len - 2]
        seq_input = [vocab_to_idx[SOS_TOKEN]] + seq + [vocab_to_idx[EOS_TOKEN]]
        seq_input += [vocab_to_idx[PAD_TOKEN]] * (self.max_length - len(seq_input))
        return torch.tensor(seq_input)

def collate_fn(batch):
    input_ids = torch.stack(batch)
    labels = input_ids.clone()
    return input_ids, labels

# 定义模型配置
config = TTTConfig(
    vocab_size=vocab_size,
    **TTT_STANDARD_CONFIGS['350m'],
    max_position_embeddings=2048,
    ttt_layer_type='linear',
    ttt_base_lr=1.0,
    pre_conv=False,
    conv_kernel=4,
)

# 检查并加载模型
if os.path.isfile('ttt_model_complete.pth'):
    print(f"进程 {rank}:检测到已保存的模型,加载模型...")
    model = TTTForCausalLM(config)
    checkpoint = torch.load('ttt_model_complete.pth', map_location=device)
    model.load_state_dict(checkpoint['model'])
    start_epoch = checkpoint.get('epoch', 0)
    best_val_loss = checkpoint.get('loss', float('inf'))
    print(f"加载模型,epoch {start_epoch},loss: {best_val_loss:.4f}")
else:
    print(f"进程 {rank}:未检测到已保存的模型,初始化新模型...")
    model = TTTForCausalLM(config)
    start_epoch = 0
    best_val_loss = float('inf')

model = model.to(device)
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

# 定义优化器和学习率调度器
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)

# 定义混合精度训练的梯度缩放器
scaler = amp.GradScaler()

# Early stopping 和训练相关变量
early_stopping_patience = 3
early_stopping_counter = 0
train_losses = []
val_losses = []
num_epochs = 4

# 保存模型的函数
def save_model(epoch, model, loss, file_path='ttt_model_complete.pth'):
    if rank == 0:
        state = {
            'epoch': epoch,
            'model': model.state_dict(),
            'loss': loss
        }
        torch.save(state, file_path)
        print(f"模型已保存在 epoch {epoch},路径为: {file_path}")

# 训练循环
for epoch in range(start_epoch, num_epochs):
    model.train()
    epoch_loss = 0

    # 分块处理训练数据
    for chunk_idx in range(num_chunks):
        start_idx = chunk_idx * chunk_size
        end_idx = min((chunk_idx + 1) * chunk_size, len(train_seqs_full))
        train_seqs = train_seqs_full[start_idx:end_idx]
        print(f"Rank {rank}: 正在处理训练数据子集 {chunk_idx + 1}/{num_chunks}, 包含 {len(train_seqs)} 条序列。")

        train_dataset = ProteinDataset(train_seqs)
        train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank, shuffle=True)
        train_loader = DataLoader(train_dataset, batch_size=2, sampler=train_sampler, collate_fn=collate_fn)

        train_sampler.set_epoch(epoch + chunk_idx)

        if rank == 0:
            progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} Chunk {chunk_idx + 1}/{num_chunks}", leave=False)
        else:
            progress_bar = train_loader

        for input_ids, labels in progress_bar:
            input_ids = input_ids.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            with amp.autocast(device_type='cuda'):
                outputs = model(input_ids=input_ids, labels=labels)
                loss = outputs.loss

            scaler.scale(loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()

            epoch_loss += loss.item()
            if rank == 0:
                progress_bar.set_postfix(loss=f"{loss.item():.4f}")

    avg_loss = epoch_loss / (len(train_seqs_full))
    if rank == 0:
        train_losses.append(avg_loss)
        print(f"第 {epoch + 1}/{num_epochs} 轮,训练损失:{avg_loss:.4f}")

    model.eval()
    val_loss = 0

    val_chunk_size = 1000
    val_num_chunks = (len(val_seqs_full) + val_chunk_size - 1) // val_chunk_size

    for val_chunk_idx in range(val_num_chunks):
        val_start_idx = val_chunk_idx * val_chunk_size
        val_end_idx = min((val_chunk_idx + 1) * val_chunk_size, len(val_seqs_full))
        val_seqs = val_seqs_full[val_start_idx:val_end_idx]
        print(f"Rank {rank}: 正在处理验证数据子集 {val_chunk_idx + 1}/{val_num_chunks}, 包含 {len(val_seqs)} 条序列。")

        val_dataset = ProteinDataset(val_seqs)
        val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank)
        val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, sampler=val_sampler, collate_fn=collate_fn)

        with torch.no_grad():
            for input_ids, labels in val_loader:
                input_ids = input_ids.to(device)
                labels = labels.to(device)
                outputs = model(input_ids=input_ids, labels=labels)
                loss = outputs.loss
                val_loss += loss.item()

    avg_val_loss = val_loss / len(val_seqs_full)
    if rank == 0:
        val_losses.append(avg_val_loss)
        print(f"第 {epoch + 1}/{num_epochs} 轮,验证损失:{avg_val_loss:.4f}")

        scheduler.step(avg_val_loss)

        save_model(epoch + 1, model.module, avg_val_loss)

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            early_stopping_counter = 0
            torch.save(model.module.state_dict(), 'ttt_language_model_best.pth')
            print("模型已保存,验证损失降低。")
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= early_stopping_patience:
                print(f"验证损失没有改善,提前停止训练。在 epoch {epoch + 1} 停止。")
                break

# 加载最优模型进行预测
if rank == 0:
    model.module.load_state_dict(torch.load('ttt_language_model_best.pth'))
    model.module.to('cpu')
    model.module.eval()

    def generate_sequence(model, start_tokens, max_length=2048, temperature=1.0):
        model.eval()

        input_ids = torch.tensor(
            [vocab_to_idx[SOS_TOKEN]] + [vocab_to_idx.get(token, vocab_to_idx[UNK_TOKEN]) for token in start_tokens],
            device='cpu'
        ).unsqueeze(0)

        generated_ids = model.generate(
            input_ids=input_ids,
            max_length=max_length,
            temperature=temperature,
            eos_token_id=vocab_to_idx[EOS_TOKEN],
            pad_token_id=vocab_to_idx[PAD_TOKEN],
            do_sample=True,
            top_k=50,
            top_p=0.95
        )

        generated_sequence = ''.join([idx_to_vocab[token_id.item()] for token_id in generated_ids[0][1:]])
        return generated_sequence

    start_tokens = random.choice(amino_acids)
    generated_seq = generate_sequence(model.module, start_tokens)
    print(f"以 '{start_tokens}' 开头生成的序列:{generated_seq}")

下面是SLURM提交

#!/bin/bash
#SBATCH -p 
#SBATCH -N 2   
#SBATCH --gres=gpu:4        # 申请8个GPU
#SBATCH --ntasks=8         # 总共 8 个任务
#SBATCH --qos=gpugpu
#SBATCH -t 7-00:00:00       # 设置最大运行时间为 7 天
#SBATCH -o output_%j.log     # 输出文件(%j为作业号)
#SBATCH -e error_%j.log      # 错误文件(%j为作业号)


module load anaconda/2024.02
module load nccl/2.16.5-1_cuda11.8

export MASTER_ADDR=$(scontrol show hostname $SLURM_NODELIST | head -n 1)
export MASTER_PORT=$(shuf -i 10000-65000 -n 1)  # 动态选择一个未被占用的端口


#export NCCL_SOCKET_IFNAME=bond0
#export NCCL_BLOCKING_WAIT=1
#export NCCL_DEBUG=INFO
#export NCCL_TIMEOUT=1800
#export WORLD_SIZE=8


# 执行训练脚本
torchrun --nnodes=$SLURM_JOB_NUM_NODES --nproc_per_node=4 --node_rank=$SLURM_NODEID --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT  script.py

  • 写回答

11条回答 默认 最新

  • 阿里嘎多学长 2024-10-15 22:24
    关注

    AIGC生成:


    PyTorch分布式环境初始化报错解决方案

    你好!我是阿里嘎多学长,欢迎你来到我的答疑区。根据你的问题,PyTorch分布式环境初始化报错是非常常见的问题。下面,我将尝试分析和解决你的问题。

    问题分析

    1. 你的数据集非常大,导致在服务器上跑时出现问题。
    2. 你的代码在小部分数据集上跑起来没问题,但是在完整数据集上跑时出现分布式环境初始化报错。

    解决方案

    1. 检查数据集是否正确:确保你的数据集正确无误,检查数据集的格式、数据类型和数据分布是否正确。
    2. 检查 PyTorch 版本:确保你使用的 PyTorch 版本正确,最新版本可能会解决一些问题。
    3. 检查分布式环境配置:检查你的分布式环境配置是否正确,确保每个节点的配置相同。
    4. 检查数据并行:检查你的数据并行配置是否正确,确保每个节点处理的数据量相等。
    5. 检查 GPU 配置:检查你的 GPU 配置是否正确,确保每个节点的 GPU 配置相同。

    示例代码

    以下是一个简单的示例代码,用于演示如何使用 PyTorch 实现分布式环境:

    import torch
    import torch.distributed as dist
    
    # 设置分布式环境
    dist.init_process_group(backend='nccl', init_method='env://')
    
    # 创建模型
    model = torch.nn.Linear(5, 3)
    
    # 分布式训练
    dist.launcher(model, 2)  # 2 个节点
    

    示例输出

    在上面的示例代码中,我们使用 dist.init_process_group 函数初始化分布式环境,然后使用 dist.launcher 函数启动分布式训练。示例输出如下:

    Node 0: Initializing...
    Node 1: Initializing...
    Node 0: Training...
    Node 1: Training...
    

    总结

    分布式环境初始化报错可能是由于数据集、PyTorch 版本、分布式环境配置、数据并行或 GPU 配置等问题引起的。通过检查这些问题和使用示例代码,可以解决分布式环境初始化报错问题。


    评论

报告相同问题?

问题事件

  • 创建了问题 10月15日

悬赏问题

  • ¥30 模拟电路 logisim
  • ¥15 PVE8.2.7无法成功使用a5000的vGPU,什么原因
  • ¥15 is not in the mmseg::model registry。报错,模型注册表找不到自定义模块。
  • ¥15 安装quartus II18.1时弹出此error,怎么解决?
  • ¥15 keil官网下载psn序列号在哪
  • ¥15 想用adb命令做一个通话软件,播放录音
  • ¥30 Pytorch深度学习服务器跑不通问题解决?
  • ¥15 部分客户订单定位有误的问题
  • ¥15 如何在maya程序中利用python编写领子和褶裥的模型的方法
  • ¥15 Bug traq 数据包 大概什么价