e555666ee 2025-05-25 00:29 采纳率: 40%
浏览 6
已结题

语音识别代码还是写不对

救命!就要交了还写不对,有谁能帮我修改一下吗?
一、实验目的:

  1. 理解端到端语音识别系统的基本原理,掌握其与传统语音识别系统的区别。
  2. 掌握语音特征提取的基本方法,特别是MFCC特征提取的原理和实现。
  3. 验证Transformer在中文语音识别任务中的有效性与优势。
  4. 理解端到端模型的训练和优化方法。

二、实验内容:

  1. 获取数据集:下载希尔贝壳中文普通话语音数据集AISHELL-1,下载地址https://www.aishelltech.com/kysjcp;解压所有压缩包到当前文件夹。其中音频的文件名.wav,对应的文本标签在aishell_transcript_v0.8.txt中,文本标签编号与音频文件名相同。考虑训练时间长,可以只用一半的AISHELL-1数据集。
  2. 读取语音数据,音频标准化为的16kHz采样率;整理为数据集,每个样本音频信号对应其文本。
  3. 数据预处理部分,采用librosa库获取音频文件MFCC特征。
  4. 基于人工智能辅助,设计一个基于Transform的端到端语音识别模型。要求设置batch_size,epochs,optimizer: AdamW,loss_function: CTC Loss。
  5. 模型训练与优化。保存最优模型。
  6. 测试与评估部分,评估性能指标包括:统计语音识别句子识别的准确性,采用jiwer库统计字符错误率(CER)与词错误率(WER),计算模型训练时间和模型预测时间。
  7. 实验报告撰写:错误识别分析,运行效率分析,总结改进方向。


import os
import time
import math
import h5py
import jiwer
import librosa
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torch.cuda.amp import autocast, GradScaler

# 配置参数
DATA_ROOT = r"C:\Users\www\AISHELL1_part_wav"
BATCH_SIZE = 128      # 增大批量提升吞吐量
EPOCHS = 15           # 减少训练轮次
MAX_SAMPLES = 50000   # 使用部分数据加速实验
NUM_WORKERS = 8       # 根据CPU核心数调整
MODEL_DIM = 192       # 减小模型维度
NUM_LAYERS = 4        # 减少Transformer层数
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 数据集类(用于特征预处理)
class AISHELLDataset(Dataset):
    def __init__(self, data_root, split='train'):
        self.data_root = os.path.join(data_root, split)
        self.transcripts = {}
        label_path = os.path.join(data_root, "aishell_transcript_v0.8.txt")
        
        with open(label_path, 'r', encoding='utf-8') as f:
            for line in f:
                parts = line.strip().split(maxsplit=1)
                if len(parts) == 2:
                    self.transcripts[parts[0]] = parts[1]
        
        self.audio_files = []
        for root, _, files in os.walk(self.data_root):
            for file in files:
                if file.endswith('.wav'):
                    audio_id = os.path.splitext(file)[0]
                    if audio_id in self.transcripts:
                        self.audio_files.append(os.path.join(root, file))

    def __len__(self):
        return min(len(self.audio_files), MAX_SAMPLES) if 'train' in self.data_root else len(self.audio_files)

    def __getitem__(self, idx):
        wav_path = self.audio_files[idx]
        audio, sr = librosa.load(wav_path, sr=16000)
        mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=40, n_fft=400, hop_length=160).T
        mfcc = (mfcc - mfcc.mean(axis=0)) / (mfcc.std(axis=0) + 1e-8)
        text = self.transcripts[os.path.splitext(os.path.basename(wav_path))[0]]
        return {
            'mfcc': torch.FloatTensor(mfcc),
            'text': text,
            'input_len': mfcc.shape[0],
            'target_len': len(text)
        }

# 特征预处理函数
def preprocess_features():
    for split in ['train', 'dev', 'test']:
        dataset = AISHELLDataset(DATA_ROOT, split)
        h5_path = os.path.join(DATA_ROOT, f"{split}_features.h5")
        
        with h5py.File(h5_path, 'w') as hf:
            for i in range(len(dataset)):
                item = dataset[i]
                grp = hf.create_group(str(i))
                grp.create_dataset('mfcc', data=item['mfcc'].numpy())
                grp.attrs['text'] = item['text']
                grp.attrs['input_len'] = item['input_len']
                grp.attrs['target_len'] = item['target_len']

# 优化数据集类
class OptimizedDataset(Dataset):
    def __init__(self, split):
        self.h5_path = os.path.join(DATA_ROOT, f"{split}_features.h5")
        self.h5_file = h5py.File(self.h5_path, 'r')
        self.keys = list(self.h5_file.keys())
        
        if split == 'train':
            all_text = [self.h5_file[k].attrs['text'] for k in self.keys]
            chars = sorted(set(''.join(all_text)))
            self.char_dict = {'<blank>':0, **{c:i+1 for i,c in enumerate(chars)}}
        else:
            train_set = OptimizedDataset('train')
            self.char_dict = train_set.char_dict

    def __len__(self):
        return len(self.keys)

    def __getitem__(self, idx):
        grp = self.h5_file[self.keys[idx]]
        text_indices = [self.char_dict.get(c, 0) for c in grp.attrs['text']]
        return {
            'mfcc': torch.FloatTensor(grp['mfcc'][:]),
            'text': grp.attrs['text'],
            'text_idx': torch.LongTensor(text_indices),
            'input_len': grp.attrs['input_len'],
            'target_len': grp.attrs['target_len']
        }

# 轻量级Transformer模型
class LiteTransformer(nn.Module):
    def __init__(self, input_dim, vocab_size):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv1d(input_dim, MODEL_DIM, 3, padding=1),
            nn.ReLU(),
            nn.Conv1d(MODEL_DIM, MODEL_DIM, 3, stride=2, padding=1),
        )
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=MODEL_DIM,
                nhead=4,
                dim_feedforward=MODEL_DIM*2,
                dropout=0.1,
                activation='gelu'
            ), num_layers=NUM_LAYERS
        )
        self.classifier = nn.Linear(MODEL_DIM, vocab_size)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # [B, C, T]
        x = self.conv(x)        # [B, D, T/2]
        x = x.permute(2, 0, 1)  # [T/2, B, D]
        return self.classifier(self.transformer(x)).log_softmax(dim=-1)

def collate_fn(batch):
    return {
        'mfcc': pad_sequence([b['mfcc'] for b in batch], batch_first=True),
        'text': [b['text'] for b in batch],
        'text_idx': pad_sequence([b['text_idx'] for b in batch], batch_first=True, padding_value=0),
        'input_lens': torch.tensor([b['input_len'] for b in batch]) // 2,
        'target_lens': torch.tensor([b['target_len'] for b in batch])
    }

# 训练函数
def train_epoch(model, loader, criterion, optimizer, scaler):
    model.train()
    total_loss = 0.0
    for batch in loader:
        optimizer.zero_grad()
        with autocast():
            outputs = model(batch['mfcc'].to(DEVICE, non_blocking=True))
            loss = criterion(
                outputs.permute(1, 0, 2),
                batch['text_idx'].to(DEVICE, non_blocking=True),
                batch['input_lens'].to(DEVICE),
                batch['target_lens'].to(DEVICE)
            )
        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        nn.utils.clip_grad_norm_(model.parameters(), 2.0)
        scaler.step(optimizer)
        scaler.update()
        total_loss += loss.item()
    return total_loss / len(loader)

def evaluate(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    all_refs, all_hyps = [], []
    with torch.no_grad():
        for batch in loader:
            outputs = model(batch['mfcc'].to(DEVICE))
            loss = criterion(
                outputs.permute(1, 0, 2),
                batch['text_idx'].to(DEVICE),
                batch['input_lens'].to(DEVICE),
                batch['target_lens'].to(DEVICE)
            )
            total_loss += loss.item()
            _, preds = torch.max(outputs, dim=2)
            for i in range(preds.size(1)):
                decoded = ''.join([loader.dataset.char_dict.get(idx.item(), '') 
                                 for idx in preds[:, i] if idx != 0])
                all_hyps.append(decoded)
                all_refs.append(batch['text'][i])
    return {
        'loss': total_loss / len(loader),
        'cer': jiwer.cer(all_refs, all_hyps),
        'wer': jiwer.wer(all_refs, all_hyps)
    }

def main():
    # 特征预处理(首次运行需要执行)
    if not all(os.path.exists(os.path.join(DATA_ROOT, f"{split}_features.h5")) for split in ['train', 'dev', 'test']):
        print("正在预处理特征...")
        preprocess_features()
    
    # 初始化数据集
    train_set = OptimizedDataset('train')
    dev_set = OptimizedDataset('dev')
    test_set = OptimizedDataset('test')
    
    # 数据加载器
    train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True,
                            collate_fn=collate_fn, num_workers=NUM_WORKERS, pin_memory=True)
    dev_loader = DataLoader(dev_set, batch_size=BATCH_SIZE*2, collate_fn=collate_fn)
    
    # 初始化模型
    model = LiteTransformer(40, len(train_set.char_dict)+1).to(DEVICE)  # 修正此处
    optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=0.01)
    criterion = nn.CTCLoss(blank=0)
    scaler = GradScaler()
    
    # 训练循环
    best_cer = float('inf')
    print(f"开始训练,设备:{DEVICE}")
    print(f"训练样本:{len(train_set)},验证样本:{len(dev_set)}")
    
    for epoch in range(EPOCHS):
        start_time = time.time()
        train_loss = train_epoch(model, train_loader, criterion, optimizer, scaler)
        
        if (epoch+1) % 2 == 0 or epoch == EPOCHS-1:
            dev_metrics = evaluate(model, dev_loader, criterion)
            print(f"Epoch {epoch+1}/{EPOCHS} | Time: {time.time()-start_time:.1f}s")
            print(f"Train Loss: {train_loss:.4f} | Dev CER: {dev_metrics['cer']:.4f}")
            
            if dev_metrics['cer'] < best_cer:
                best_cer = dev_metrics['cer']
                torch.save(model.state_dict(), "best_model.pth")
                print("保存新的最佳模型!")

    # 最终测试
    model.load_state_dict(torch.load("best_model.pth"))
    test_loader = DataLoader(test_set, batch_size=BATCH_SIZE*2, collate_fn=collate_fn)
    test_metrics = evaluate(model, test_loader, criterion)
    print(f"\n最终测试结果:CER={test_metrics['cer']:.4f}, WER={test_metrics['wer']:.4f}")

if __name__ == "__main__":
    main()

  • 写回答

4条回答 默认 最新

  • 檀越@新空间 2025-05-25 05:50
    关注

    上午好☀️☀️☀️️
    本答案参考通义千问

    从你的描述来看,你的代码已经涵盖了大部分端到端语音识别系统的必要步骤,但可能存在一些细节问题导致无法正常运行或达到预期效果。以下是对代码的分析和改进建议:


    已知问题及改进建议

    1. OptimizedDataset 类的定义不完整

    你提到 OptimizedDataset 类的定义不完整,缺少关键的部分,比如 __getitem__ 方法。这个方法是数据集类的核心,负责返回每个样本的数据。

    解决方案:OptimizedDataset 类中添加 __getitem__ 方法,并确保它能够正确加载 HDF5 文件中的数据。

    class OptimizedDataset(Dataset):
        def __init__(self, split):
            self.h5_path = os.path.join(DATA_ROOT, f"{split}_features.h5")
            self.h5_file = h5py.File(self.h5_path, 'r')
            self.keys = list(self.h5_file.keys())
            
            if split == 'train':
                all_text = [self.h5_file[k].attrs['text'] for k in self.keys]
                chars = sorted(set(''.join(all_text)))
                self.char_dict = {'<blank>':0, **{c:i+1 for i,c in enumerate(chars)}}
            else:
                train_set = OptimizedDataset('train')
                self.char_dict = train_set.char_dict
    
        def __len__(self):
            return len(self.keys)
    
        def __getitem__(self, idx):
            key = self.keys[idx]
            group = self.h5_file[key]
            mfcc = torch.tensor(group['mfcc'][:], dtype=torch.float32)
            text = group.attrs['text']
            input_len = group.attrs['input_len']
            target_len = group.attrs['target_len']
            text_indices = torch.tensor([self.char_dict.get(c, 0) for c in text], dtype=torch.long)
            return {
                'mfcc': mfcc,
                'text': text,
                'text_indices': text_indices,
                'input_len': input_len,
                'target_len': target_len
            }
    

    2. 数据集预处理未完成

    虽然你提到了 preprocess_features() 函数,但没有看到它被调用。如果数据集的特征还没有生成,模型将无法正常工作。

    解决方案: 在脚本的开头调用 preprocess_features() 函数,确保特征已经预处理完毕。

    if __name__ == "__main__":
        preprocess_features()
    

    3. Transformer 模型的实现缺失

    你提到需要实现一个基于 Transformer 的端到端语音识别模型,但代码中并没有看到模型的具体定义。

    解决方案: 以下是基于 Transformer 的简单实现:

    class TransformerModel(nn.Module):
        def __init__(self, input_dim, vocab_size, model_dim=MODEL_DIM, num_layers=NUM_LAYERS, num_heads=4, ff_dim=512, dropout=0.1):
            super(TransformerModel, self).__init__()
            self.input_dim = input_dim
            self.model_dim = model_dim
            self.num_heads = num_heads
            self.ff_dim = ff_dim
            self.dropout = dropout
            
            self.encoder = nn.TransformerEncoder(
                nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads, dim_feedforward=ff_dim, dropout=dropout),
                num_layers=num_layers
            )
            
            self.decoder = nn.Linear(model_dim, vocab_size)
        
        def forward(self, src, src_mask=None, src_key_padding_mask=None):
            enc_out = self.encoder(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask)
            output = self.decoder(enc_out)
            return output
    

    4. 数据加载器的实现

    你需要使用 DataLoader 来加载数据集。由于 OptimizedDataset 返回的是字典类型的样本,需要对 text_indices 进行批量处理。

    解决方案:

    def collate_fn(batch):
        mfccs = [item['mfcc'] for item in batch]
        texts = [item['text'] for item in batch]
        text_indices = [item['text_indices'] for item in batch]
        input_lens = torch.tensor([item['input_len'] for item in batch], dtype=torch.int32)
        target_lens = torch.tensor([item['target_len'] for item in batch], dtype=torch.int32)
        
        # 对齐 MFCC 特征
        mfccs_padded = pad_sequence(mfccs, batch_first=True)
        text_indices_padded = pad_sequence(text_indices, batch_first=True, padding_value=0)
        
        return {
            'mfccs': mfccs_padded,
            'texts': texts,
            'text_indices': text_indices_padded,
            'input_lens': input_lens,
            'target_lens': target_lens
        }
    
    # 创建数据加载器
    train_loader = DataLoader(OptimizedDataset('train'), batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn, num_workers=NUM_WORKERS)
    dev_loader = DataLoader(OptimizedDataset('dev'), batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn, num_workers=NUM_WORKERS)
    test_loader = DataLoader(OptimizedDataset('test'), batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn, num_workers=NUM_WORKERS)
    

    5. 损失函数的选择

    你提到使用 CTC Loss,但在代码中并未实现。CTC Loss 是一种专门用于序列标注任务的损失函数,适合语音识别任务。

    解决方案:

    ctc_loss = nn.CTCLoss(blank=0, zero_infinity=True).to(DEVICE)
    

    在训练时,使用以下方式计算损失:

    def compute_loss(pred, target, input_lengths, target_lengths):
        pred_log_probs = torch.log_softmax(pred, dim=-1)
        loss = ctc_loss(pred_log_probs.transpose(0, 1), target, input_lengths, target_lengths)
        return loss
    

    6. 训练和优化

    以下是完整的训练循环示例:

    model = TransformerModel(input_dim=40, vocab_size=len(train_set.char_dict)).to(DEVICE)
    optimizer = optim.AdamW(model.parameters(), lr=1e-4)
    scaler = GradScaler()
    
    for epoch in range(EPOCHS):
        model.train()
        total_loss = 0
        start_time = time.time()
        
        for batch in train_loader:
            mfccs = batch['mfccs'].to(DEVICE)
            text_indices = batch['text_indices'].to(DEVICE)
            input_lens = batch['input_lens'].to(DEVICE)
            target_lens = batch['target_lens'].to(DEVICE)
            
            optimizer.zero_grad()
            with autocast():
                output = model(mfccs)
                loss = compute_loss(output, text_indices, input_lens, target_lens)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        elapsed_time = time.time() - start_time
        print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {avg_loss:.4f}, Time: {elapsed_time:.2f}s")
    

    总结

    通过上述改进,你的代码应该可以正常运行并完成端到端语音识别任务。以下是需要完成的关键点:

    1. 调用 preprocess_features() 生成特征。
    2. 完善 OptimizedDataset 类的 __getitem__ 方法。
    3. 实现 Transformer 模型。
    4. 使用 CTC Loss 进行训练。
    5. 使用 DataLoader 和自定义 collate_fn 加载数据。

    希望这些改进能帮助你顺利完成实验!

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(3条)

报告相同问题?

问题事件

  • 系统已结题 6月19日
  • 已采纳回答 6月11日
  • 创建了问题 5月25日