救命!就要交了还写不对,有谁能帮我修改一下吗?
一、实验目的:
- 理解端到端语音识别系统的基本原理,掌握其与传统语音识别系统的区别。
- 掌握语音特征提取的基本方法,特别是MFCC特征提取的原理和实现。
- 验证Transformer在中文语音识别任务中的有效性与优势。
- 理解端到端模型的训练和优化方法。
二、实验内容:
- 获取数据集:下载希尔贝壳中文普通话语音数据集AISHELL-1,下载地址https://www.aishelltech.com/kysjcp;解压所有压缩包到当前文件夹。其中音频的文件名.wav,对应的文本标签在aishell_transcript_v0.8.txt中,文本标签编号与音频文件名相同。考虑训练时间长,可以只用一半的AISHELL-1数据集。
- 读取语音数据,音频标准化为的16kHz采样率;整理为数据集,每个样本音频信号对应其文本。
- 数据预处理部分,采用librosa库获取音频文件MFCC特征。
- 基于人工智能辅助,设计一个基于Transform的端到端语音识别模型。要求设置batch_size,epochs,optimizer: AdamW,loss_function: CTC Loss。
- 模型训练与优化。保存最优模型。
- 测试与评估部分,评估性能指标包括:统计语音识别句子识别的准确性,采用jiwer库统计字符错误率(CER)与词错误率(WER),计算模型训练时间和模型预测时间。
- 实验报告撰写:错误识别分析,运行效率分析,总结改进方向。
import os
import time
import math
import h5py
import jiwer
import librosa
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torch.cuda.amp import autocast, GradScaler
# 配置参数
DATA_ROOT = r"C:\Users\www\AISHELL1_part_wav"
BATCH_SIZE = 128 # 增大批量提升吞吐量
EPOCHS = 15 # 减少训练轮次
MAX_SAMPLES = 50000 # 使用部分数据加速实验
NUM_WORKERS = 8 # 根据CPU核心数调整
MODEL_DIM = 192 # 减小模型维度
NUM_LAYERS = 4 # 减少Transformer层数
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 数据集类(用于特征预处理)
class AISHELLDataset(Dataset):
def __init__(self, data_root, split='train'):
self.data_root = os.path.join(data_root, split)
self.transcripts = {}
label_path = os.path.join(data_root, "aishell_transcript_v0.8.txt")
with open(label_path, 'r', encoding='utf-8') as f:
for line in f:
parts = line.strip().split(maxsplit=1)
if len(parts) == 2:
self.transcripts[parts[0]] = parts[1]
self.audio_files = []
for root, _, files in os.walk(self.data_root):
for file in files:
if file.endswith('.wav'):
audio_id = os.path.splitext(file)[0]
if audio_id in self.transcripts:
self.audio_files.append(os.path.join(root, file))
def __len__(self):
return min(len(self.audio_files), MAX_SAMPLES) if 'train' in self.data_root else len(self.audio_files)
def __getitem__(self, idx):
wav_path = self.audio_files[idx]
audio, sr = librosa.load(wav_path, sr=16000)
mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=40, n_fft=400, hop_length=160).T
mfcc = (mfcc - mfcc.mean(axis=0)) / (mfcc.std(axis=0) + 1e-8)
text = self.transcripts[os.path.splitext(os.path.basename(wav_path))[0]]
return {
'mfcc': torch.FloatTensor(mfcc),
'text': text,
'input_len': mfcc.shape[0],
'target_len': len(text)
}
# 特征预处理函数
def preprocess_features():
for split in ['train', 'dev', 'test']:
dataset = AISHELLDataset(DATA_ROOT, split)
h5_path = os.path.join(DATA_ROOT, f"{split}_features.h5")
with h5py.File(h5_path, 'w') as hf:
for i in range(len(dataset)):
item = dataset[i]
grp = hf.create_group(str(i))
grp.create_dataset('mfcc', data=item['mfcc'].numpy())
grp.attrs['text'] = item['text']
grp.attrs['input_len'] = item['input_len']
grp.attrs['target_len'] = item['target_len']
# 优化数据集类
class OptimizedDataset(Dataset):
def __init__(self, split):
self.h5_path = os.path.join(DATA_ROOT, f"{split}_features.h5")
self.h5_file = h5py.File(self.h5_path, 'r')
self.keys = list(self.h5_file.keys())
if split == 'train':
all_text = [self.h5_file[k].attrs['text'] for k in self.keys]
chars = sorted(set(''.join(all_text)))
self.char_dict = {'<blank>':0, **{c:i+1 for i,c in enumerate(chars)}}
else:
train_set = OptimizedDataset('train')
self.char_dict = train_set.char_dict
def __len__(self):
return len(self.keys)
def __getitem__(self, idx):
grp = self.h5_file[self.keys[idx]]
text_indices = [self.char_dict.get(c, 0) for c in grp.attrs['text']]
return {
'mfcc': torch.FloatTensor(grp['mfcc'][:]),
'text': grp.attrs['text'],
'text_idx': torch.LongTensor(text_indices),
'input_len': grp.attrs['input_len'],
'target_len': grp.attrs['target_len']
}
# 轻量级Transformer模型
class LiteTransformer(nn.Module):
def __init__(self, input_dim, vocab_size):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(input_dim, MODEL_DIM, 3, padding=1),
nn.ReLU(),
nn.Conv1d(MODEL_DIM, MODEL_DIM, 3, stride=2, padding=1),
)
self.transformer = nn.TransformerEncoder(
nn.TransformerEncoderLayer(
d_model=MODEL_DIM,
nhead=4,
dim_feedforward=MODEL_DIM*2,
dropout=0.1,
activation='gelu'
), num_layers=NUM_LAYERS
)
self.classifier = nn.Linear(MODEL_DIM, vocab_size)
def forward(self, x):
x = x.permute(0, 2, 1) # [B, C, T]
x = self.conv(x) # [B, D, T/2]
x = x.permute(2, 0, 1) # [T/2, B, D]
return self.classifier(self.transformer(x)).log_softmax(dim=-1)
def collate_fn(batch):
return {
'mfcc': pad_sequence([b['mfcc'] for b in batch], batch_first=True),
'text': [b['text'] for b in batch],
'text_idx': pad_sequence([b['text_idx'] for b in batch], batch_first=True, padding_value=0),
'input_lens': torch.tensor([b['input_len'] for b in batch]) // 2,
'target_lens': torch.tensor([b['target_len'] for b in batch])
}
# 训练函数
def train_epoch(model, loader, criterion, optimizer, scaler):
model.train()
total_loss = 0.0
for batch in loader:
optimizer.zero_grad()
with autocast():
outputs = model(batch['mfcc'].to(DEVICE, non_blocking=True))
loss = criterion(
outputs.permute(1, 0, 2),
batch['text_idx'].to(DEVICE, non_blocking=True),
batch['input_lens'].to(DEVICE),
batch['target_lens'].to(DEVICE)
)
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
nn.utils.clip_grad_norm_(model.parameters(), 2.0)
scaler.step(optimizer)
scaler.update()
total_loss += loss.item()
return total_loss / len(loader)
def evaluate(model, loader, criterion):
model.eval()
total_loss = 0.0
all_refs, all_hyps = [], []
with torch.no_grad():
for batch in loader:
outputs = model(batch['mfcc'].to(DEVICE))
loss = criterion(
outputs.permute(1, 0, 2),
batch['text_idx'].to(DEVICE),
batch['input_lens'].to(DEVICE),
batch['target_lens'].to(DEVICE)
)
total_loss += loss.item()
_, preds = torch.max(outputs, dim=2)
for i in range(preds.size(1)):
decoded = ''.join([loader.dataset.char_dict.get(idx.item(), '')
for idx in preds[:, i] if idx != 0])
all_hyps.append(decoded)
all_refs.append(batch['text'][i])
return {
'loss': total_loss / len(loader),
'cer': jiwer.cer(all_refs, all_hyps),
'wer': jiwer.wer(all_refs, all_hyps)
}
def main():
# 特征预处理(首次运行需要执行)
if not all(os.path.exists(os.path.join(DATA_ROOT, f"{split}_features.h5")) for split in ['train', 'dev', 'test']):
print("正在预处理特征...")
preprocess_features()
# 初始化数据集
train_set = OptimizedDataset('train')
dev_set = OptimizedDataset('dev')
test_set = OptimizedDataset('test')
# 数据加载器
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True,
collate_fn=collate_fn, num_workers=NUM_WORKERS, pin_memory=True)
dev_loader = DataLoader(dev_set, batch_size=BATCH_SIZE*2, collate_fn=collate_fn)
# 初始化模型
model = LiteTransformer(40, len(train_set.char_dict)+1).to(DEVICE) # 修正此处
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=0.01)
criterion = nn.CTCLoss(blank=0)
scaler = GradScaler()
# 训练循环
best_cer = float('inf')
print(f"开始训练,设备:{DEVICE}")
print(f"训练样本:{len(train_set)},验证样本:{len(dev_set)}")
for epoch in range(EPOCHS):
start_time = time.time()
train_loss = train_epoch(model, train_loader, criterion, optimizer, scaler)
if (epoch+1) % 2 == 0 or epoch == EPOCHS-1:
dev_metrics = evaluate(model, dev_loader, criterion)
print(f"Epoch {epoch+1}/{EPOCHS} | Time: {time.time()-start_time:.1f}s")
print(f"Train Loss: {train_loss:.4f} | Dev CER: {dev_metrics['cer']:.4f}")
if dev_metrics['cer'] < best_cer:
best_cer = dev_metrics['cer']
torch.save(model.state_dict(), "best_model.pth")
print("保存新的最佳模型!")
# 最终测试
model.load_state_dict(torch.load("best_model.pth"))
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE*2, collate_fn=collate_fn)
test_metrics = evaluate(model, test_loader, criterion)
print(f"\n最终测试结果:CER={test_metrics['cer']:.4f}, WER={test_metrics['wer']:.4f}")
if __name__ == "__main__":
main()