正在利用深度学习来进行蛋白质序列的预测。因为数据集比较大,所以在服务器上跑。取了数据集中一小部分跑起来没问题,但是完整数据集在服务器上调用两个节点就会出现分布式环境初始化的报错。请问是出了什么问题?(改了很多次代码,出现不同的问题:OOM/NCCL通信的问题等;服务器用的北京超算云计算)模型的配置不能变
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
import random
import numpy as np
import os
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from transformers import AutoTokenizer
from ttt import TTTForCausalLM, TTTConfig, TTT_STANDARD_CONFIGS
from torch import amp # 混合精度训练
from sklearn.model_selection import train_test_split
# 设置随机种子
def setup_seed(seed):
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
# 初始化分布式环境
def setup_distributed():
dist.init_process_group(backend='nccl', init_method='env://')
rank = dist.get_rank() # 获取全局 rank
world_size = dist.get_world_size() # 获取全局 world_size
local_rank = rank % torch.cuda.device_count() # 计算每个节点的局部 GPU rank
torch.cuda.set_device(local_rank) # 设置 GPU 设备为 local_rank
setup_seed(0) # 所有进程使用相同的种子
return rank, world_size, local_rank
rank, world_size, local_rank = setup_distributed()
device = torch.device("cuda", local_rank)
PAD_TOKEN = '[PAD]'
SOS_TOKEN = '[SOS]'
EOS_TOKEN = '[EOS]'
UNK_TOKEN = '[UNK]'
# 所有进程都读取和处理数据
with open('preprocessed_sequences.txt', 'r') as f:
sequences = [line.strip() for line in f.readlines()]
# 过滤掉空序列
sequences = [seq for seq in sequences if seq]
print(f"Rank {rank}: 共加载了 {len(sequences)} 条序列。")
# 仅在主进程中构建词汇表
if rank == 0:
amino_acids = set(''.join(sequences))
amino_acids = sorted(list(amino_acids))
print(f"氨基酸种类:{amino_acids}")
vocab = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN] + amino_acids
vocab_to_idx = {token: idx for idx, token in enumerate(vocab)}
idx_to_vocab = {idx: token for idx, token in enumerate(vocab)}
vocab_size = len(vocab)
print(f"词汇表大小:{vocab_size}")
else:
amino_acids = None
vocab = None
vocab_to_idx = None
idx_to_vocab = None
vocab_size = None
# 广播词汇表到所有进程
amino_acids = [amino_acids]
vocab = [vocab]
vocab_to_idx = [vocab_to_idx]
idx_to_vocab = [idx_to_vocab]
vocab_size = [vocab_size]
dist.broadcast_object_list(amino_acids, src=0)
dist.broadcast_object_list(vocab, src=0)
dist.broadcast_object_list(vocab_to_idx, src=0)
dist.broadcast_object_list(idx_to_vocab, src=0)
dist.broadcast_object_list(vocab_size, src=0)
amino_acids = amino_acids[0]
vocab = vocab[0]
vocab_to_idx = vocab_to_idx[0]
idx_to_vocab = idx_to_vocab[0]
vocab_size = vocab_size[0]
# 定义编码函数
def encode_sequence(seq):
return [vocab_to_idx.get(char, vocab_to_idx[UNK_TOKEN]) for char in seq]
# 编码所有序列
encoded_sequences = [encode_sequence(seq) for seq in sequences]
# 使用相同的随机种子确保数据划分一致
indices = np.arange(len(encoded_sequences))
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)
train_seqs_full = [encoded_sequences[i] for i in train_indices]
val_seqs_full = [encoded_sequences[i] for i in val_indices]
chunk_size = 1000 # 每个子集包含的序列数
# 计算子集数量
num_chunks = (len(train_seqs_full) + chunk_size - 1) // chunk_size
print(f"Rank {rank}: 训练集将被分成 {num_chunks} 个子集。")
# 自定义数据集类
class ProteinDataset(Dataset):
def __init__(self, sequences, max_length=2048):
self.sequences = sequences
self.max_length = max_length
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
max_seq_len = random.randint(100, self.max_length - 2)
seq = seq[:max_seq_len - 2]
seq_input = [vocab_to_idx[SOS_TOKEN]] + seq + [vocab_to_idx[EOS_TOKEN]]
seq_input += [vocab_to_idx[PAD_TOKEN]] * (self.max_length - len(seq_input))
return torch.tensor(seq_input)
def collate_fn(batch):
input_ids = torch.stack(batch)
labels = input_ids.clone()
return input_ids, labels
# 定义模型配置
config = TTTConfig(
vocab_size=vocab_size,
**TTT_STANDARD_CONFIGS['350m'],
max_position_embeddings=2048,
ttt_layer_type='linear',
ttt_base_lr=1.0,
pre_conv=False,
conv_kernel=4,
)
# 检查并加载模型
if os.path.isfile('ttt_model_complete.pth'):
print(f"进程 {rank}:检测到已保存的模型,加载模型...")
model = TTTForCausalLM(config)
checkpoint = torch.load('ttt_model_complete.pth', map_location=device)
model.load_state_dict(checkpoint['model'])
start_epoch = checkpoint.get('epoch', 0)
best_val_loss = checkpoint.get('loss', float('inf'))
print(f"加载模型,epoch {start_epoch},loss: {best_val_loss:.4f}")
else:
print(f"进程 {rank}:未检测到已保存的模型,初始化新模型...")
model = TTTForCausalLM(config)
start_epoch = 0
best_val_loss = float('inf')
model = model.to(device)
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
# 定义优化器和学习率调度器
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)
# 定义混合精度训练的梯度缩放器
scaler = amp.GradScaler()
# Early stopping 和训练相关变量
early_stopping_patience = 3
early_stopping_counter = 0
train_losses = []
val_losses = []
num_epochs = 4
# 保存模型的函数
def save_model(epoch, model, loss, file_path='ttt_model_complete.pth'):
if rank == 0:
state = {
'epoch': epoch,
'model': model.state_dict(),
'loss': loss
}
torch.save(state, file_path)
print(f"模型已保存在 epoch {epoch},路径为: {file_path}")
# 训练循环
for epoch in range(start_epoch, num_epochs):
model.train()
epoch_loss = 0
# 分块处理训练数据
for chunk_idx in range(num_chunks):
start_idx = chunk_idx * chunk_size
end_idx = min((chunk_idx + 1) * chunk_size, len(train_seqs_full))
train_seqs = train_seqs_full[start_idx:end_idx]
print(f"Rank {rank}: 正在处理训练数据子集 {chunk_idx + 1}/{num_chunks}, 包含 {len(train_seqs)} 条序列。")
train_dataset = ProteinDataset(train_seqs)
train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank, shuffle=True)
train_loader = DataLoader(train_dataset, batch_size=2, sampler=train_sampler, collate_fn=collate_fn)
train_sampler.set_epoch(epoch + chunk_idx)
if rank == 0:
progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} Chunk {chunk_idx + 1}/{num_chunks}", leave=False)
else:
progress_bar = train_loader
for input_ids, labels in progress_bar:
input_ids = input_ids.to(device)
labels = labels.to(device)
optimizer.zero_grad()
with amp.autocast(device_type='cuda'):
outputs = model(input_ids=input_ids, labels=labels)
loss = outputs.loss
scaler.scale(loss).backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer)
scaler.update()
epoch_loss += loss.item()
if rank == 0:
progress_bar.set_postfix(loss=f"{loss.item():.4f}")
avg_loss = epoch_loss / (len(train_seqs_full))
if rank == 0:
train_losses.append(avg_loss)
print(f"第 {epoch + 1}/{num_epochs} 轮,训练损失:{avg_loss:.4f}")
model.eval()
val_loss = 0
val_chunk_size = 1000
val_num_chunks = (len(val_seqs_full) + val_chunk_size - 1) // val_chunk_size
for val_chunk_idx in range(val_num_chunks):
val_start_idx = val_chunk_idx * val_chunk_size
val_end_idx = min((val_chunk_idx + 1) * val_chunk_size, len(val_seqs_full))
val_seqs = val_seqs_full[val_start_idx:val_end_idx]
print(f"Rank {rank}: 正在处理验证数据子集 {val_chunk_idx + 1}/{val_num_chunks}, 包含 {len(val_seqs)} 条序列。")
val_dataset = ProteinDataset(val_seqs)
val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, sampler=val_sampler, collate_fn=collate_fn)
with torch.no_grad():
for input_ids, labels in val_loader:
input_ids = input_ids.to(device)
labels = labels.to(device)
outputs = model(input_ids=input_ids, labels=labels)
loss = outputs.loss
val_loss += loss.item()
avg_val_loss = val_loss / len(val_seqs_full)
if rank == 0:
val_losses.append(avg_val_loss)
print(f"第 {epoch + 1}/{num_epochs} 轮,验证损失:{avg_val_loss:.4f}")
scheduler.step(avg_val_loss)
save_model(epoch + 1, model.module, avg_val_loss)
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
early_stopping_counter = 0
torch.save(model.module.state_dict(), 'ttt_language_model_best.pth')
print("模型已保存,验证损失降低。")
else:
early_stopping_counter += 1
if early_stopping_counter >= early_stopping_patience:
print(f"验证损失没有改善,提前停止训练。在 epoch {epoch + 1} 停止。")
break
# 加载最优模型进行预测
if rank == 0:
model.module.load_state_dict(torch.load('ttt_language_model_best.pth'))
model.module.to('cpu')
model.module.eval()
def generate_sequence(model, start_tokens, max_length=2048, temperature=1.0):
model.eval()
input_ids = torch.tensor(
[vocab_to_idx[SOS_TOKEN]] + [vocab_to_idx.get(token, vocab_to_idx[UNK_TOKEN]) for token in start_tokens],
device='cpu'
).unsqueeze(0)
generated_ids = model.generate(
input_ids=input_ids,
max_length=max_length,
temperature=temperature,
eos_token_id=vocab_to_idx[EOS_TOKEN],
pad_token_id=vocab_to_idx[PAD_TOKEN],
do_sample=True,
top_k=50,
top_p=0.95
)
generated_sequence = ''.join([idx_to_vocab[token_id.item()] for token_id in generated_ids[0][1:]])
return generated_sequence
start_tokens = random.choice(amino_acids)
generated_seq = generate_sequence(model.module, start_tokens)
print(f"以 '{start_tokens}' 开头生成的序列:{generated_seq}")
下面是SLURM提交
#!/bin/bash
#SBATCH -p
#SBATCH -N 2
#SBATCH --gres=gpu:4 # 申请8个GPU
#SBATCH --ntasks=8 # 总共 8 个任务
#SBATCH --qos=gpugpu
#SBATCH -t 7-00:00:00 # 设置最大运行时间为 7 天
#SBATCH -o output_%j.log # 输出文件(%j为作业号)
#SBATCH -e error_%j.log # 错误文件(%j为作业号)
module load anaconda/2024.02
module load nccl/2.16.5-1_cuda11.8
export MASTER_ADDR=$(scontrol show hostname $SLURM_NODELIST | head -n 1)
export MASTER_PORT=$(shuf -i 10000-65000 -n 1) # 动态选择一个未被占用的端口
#export NCCL_SOCKET_IFNAME=bond0
#export NCCL_BLOCKING_WAIT=1
#export NCCL_DEBUG=INFO
#export NCCL_TIMEOUT=1800
#export WORLD_SIZE=8
# 执行训练脚本
torchrun --nnodes=$SLURM_JOB_NUM_NODES --nproc_per_node=4 --node_rank=$SLURM_NODEID --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT script.py