运用小波神经网络去噪,重构音频时,原音频信号与重构信号差异巨大是什么原因,是代码逻辑有问题还是参数没调对

评测结果

主代码
import numpy as np
from work_01 import load_audio_data
from work_01 import remove_silence
from work_02 import preprocess_signal
from work_033 import extract_wavelet_features
from work_044 import create_wavelet_neural_network
from work_05 import calculate_snr, calculate_stoi,calculate_pesq
import os
from sklearn.model_selection import train_test_split
from work_066 import create_reconstruct_audio
import tensorflow as tf
clean_dir = "data/clean_audio_files_01"
noisy_dir = "data/noisy_audio_files_01"
# clean_dir = "data/clean_audio_files"
# noisy_dir = "data/noisy_audio_files"
clean_files = os.listdir(clean_dir)
noisy_files = os.listdir(noisy_dir)
# 确保 clean_files 和 noisy_files 中的文件按相同顺序对应
clean_paths = [os.path.join(clean_dir, file) for file in clean_files]
noisy_paths = [os.path.join(noisy_dir, file) for file in noisy_files]
clean_signals_list = [] # 用于存储clean_signal的列表
noisy_signals_list = [] # 用于存储noisy_signal的列表
sample_rates_list = [] # 用于存储sample_rate的列表
X_train = []
y_train = []
batch_size = 108 # 定义批处理大小(会影响结果)
for i in range(0, len(clean_paths), batch_size):
clean_batch = clean_paths[i:i+batch_size]
noisy_batch = noisy_paths[i:i+batch_size]
batch_clean_signals = []
batch_noisy_signals = []
batch_sample_rates = []
batch_X_train = []
batch_y_train = []
# 如果 clean 和 noisy 是配对的路径列表,可以使用 zip 进行循环遍历
for clean_path, noisy_path in zip(clean_batch, noisy_batch):
clean_signal, sample_rate = load_audio_data(clean_path)
noisy_signal, _ = load_audio_data(noisy_path)
clean_signal = remove_silence(clean_signal, sample_rate)
noisy_signal = remove_silence(noisy_signal, sample_rate)
clean_frames = preprocess_signal(clean_signal, sample_rate)
noisy_frames = preprocess_signal(noisy_signal, sample_rate)
clean_features = extract_wavelet_features(clean_frames)
noisy_features = extract_wavelet_features(noisy_frames)
batch_clean_signals.append(clean_signal)
batch_noisy_signals.append(noisy_signal)
batch_sample_rates.append(sample_rate)
batch_X_train.append(noisy_features)
batch_y_train.append(clean_features)
X_train.extend(batch_X_train)
y_train.extend(batch_y_train)
clean_signals_list.extend(batch_clean_signals)
noisy_signals_list.extend(batch_noisy_signals)
sample_rates_list.extend(batch_sample_rates)
X_original = np.array(X_train)
y_original = np.array(y_train)
#----------------------------------------------------------
# 音频信号数据集划分
clean_signals_train, clean_signals_interim, noisy_signals_train,noisy_signals_interim, sample_rates_train, sample_rates_interim = train_test_split(clean_signals_list, noisy_signals_list, sample_rates_list, test_size=0.3, random_state=42)
clean_signals_val, clean_signals_test, noisy_signals_val, noisy_signals_test, sample_rates_val, sample_rates_test = train_test_split(clean_signals_interim, noisy_signals_interim,sample_rates_interim,test_size=0.5, random_state=42)
#-------------------------------------------------------
# 4.创建模型
sample_rates_interim = [] # 初始化剩余部分的采样率列表
sample_rates_test = [] # 初始化测试集的采样率列表
# 首次数据集划分:将原始数据集分为70%的训练集和30%的剩余部分
X_train, X_interim, y_train, y_interim, sample_rates_train, sample_rates_interim = train_test_split(X_original, y_original, sample_rates_list, test_size=0.3, random_state=42)
# 第二次数据集划分:将剩余部分再分为50%的验证集和50%的测试集
X_val, X_test, y_val, y_test, sample_rates_val, sample_rates_test = train_test_split(X_interim, y_interim, sample_rates_interim, test_size=0.5, random_state=42)
print("Modified X_train shape:", X_train.shape)
print("Modified y_train shape:", y_train.shape)
input_shape = X_train.shape[1:] #代码根据输入数据的形状定义模型的输入形状
print("input_shape[0]:", input_shape[0])
model, lr_schedule = create_wavelet_neural_network(input_shape) #这个函数应该返回一个配置好的Keras模型,准备进行训练。
epochs = 200
validation_data = (X_val, y_val)
X_train_tensor = tf.convert_to_tensor(X_train, dtype=tf.float32)
y_train_tensor = tf.convert_to_tensor(y_train, dtype=tf.float32)
# 确保 validation_data 也是张量
validation_data_tensor = (tf.convert_to_tensor(validation_data[0], dtype=tf.float32),
tf.convert_to_tensor(validation_data[1], dtype=tf.float32))
# 训练模型(使用fit函数来训练模型。这里使用带噪声的特征作为输入,干净的特征作为目标,训练epochs个周期)
history = model.fit(X_train, y_train, epochs=epochs, validation_data=validation_data, verbose=1, callbacks=[lr_schedule])
# 评估模型在训练集上的性能(这不是真正的评估,通常只用于调试)
train_loss = model.evaluate(X_train, y_train)
# 评估模型(获取训练过程中的指标值,例如验证集损失值)
val_loss = history.history['val_loss']
# 评估模型在测试集上的性能(这是评估模型泛化能力的关键步骤)
# test_loss = model.evaluate(X_test, y_test)
# 保存模型
model.save('denoising_model.h5', save_format='h5') # 保存 Keras 模型为 HDF5 格式,包括模型本身、权重和优化器状态
# 保存模型结构为 JSON 文件
model_json = model.to_json()
with open("denoising_model.json", "w") as json_file:
json_file.write(model_json)
# 保存模型权重为 HDF5 文件
model.save_weights("denoising_model_weights.h5")
#---------------------------------------------------------
# 使用模型进行预测(进行批量预测 )(训练完成后,代码使用相同的带噪声特征X_test作为输入,通过模型进行预测。)
denoised_features = model.predict(X_train) #denoised_features存储了模型输出的去噪特征。
print("Modified X_train shape:", X_train.shape)
print(f"denoised_features.shape: {denoised_features.shape}")
#-------------------------------------------
# 6.滤波处理 - 此处以滤波后的特征作为例子
reconstructed_audios = []
reconstructed_audios = create_reconstruct_audio(denoised_features, sample_rates_train,clean_signals_train)
#-------------------------------------------
# 5.SNR,PESQ和STOI评估
# 计算 SNR(SNR值越高,说明去噪效果越好)
# 将 denoised_features 展平为一维数组
denoised_features_flat = denoised_features.flatten()
snr_value = calculate_snr(y_train.flatten(), denoised_features_flat)
print(f"Signal-to-Noise Ratio (SNR): {snr_value} dB")
#-----------------------------------------------------
#PESQ和STOI评估
# 初始化用于存储每个样本评估结果的列表
pesq_scores = []
stoi_scores = []
for sample_rate, clean_signal, reconstructed_signal in zip(sample_rates_train, clean_signals_train, reconstructed_audios):
# 确保信号长度相同
if len(clean_signal) != len(reconstructed_signal):
raise ValueError("Clean and reconstructed signals must have the same length.")
# 计算PESQ评分
pesq_value = calculate_pesq(clean_signal, reconstructed_signal)
pesq_scores.append(pesq_value)
# 计算STOI评分
stoi_value = calculate_stoi(clean_signal, reconstructed_signal, sample_rate)
stoi_scores.append(stoi_value)
# 计算整个数据集的平均 SNR,平均PESQ和STOI评分
avg_pesq_score = np.mean(pesq_scores)
avg_stoi_score = np.mean(stoi_scores)
# avg_snr_value = np.mean(snr_values)
# 打印平均评分
# print(f"Average Signal-to-Noise Ratio (SNR): {avg_snr_value} dB")
print(f"Average PESQ Score on the dataset: {avg_pesq_score}")
print(f"Average STOI Score on the dataset: {avg_stoi_score}")
# 保存训练历史到文件
with open('work_history.txt', 'a') as file:
file.write("\nTraining history:\n")
file.write("Epochs: " + str(epochs) + "\n")
# file.write("validation_data: " + str(validation_data) + "\n")
file.write("Loss on the training set: " + str(train_loss) + "\n") # (训练集上的性能)
file.write("Validation Loss from History: " + str(val_loss[-1]) + "\n") # 获取最后一个时刻的损失值
# file.write("Loss on the test set: " + str(test_loss) + "\n") # (测试集上的性能)
file.write("Loss on the test set: " + str(val_loss) + "\n") # (测试集上的性能)
file.write("Average Signal-to-Noise Ratio (SNR): " + str(snr_value) + " dB\n")
file.write("PESQ on the test set: " + str(avg_pesq_score) + "\n")
file.write("STOI on the test set: " + str(avg_stoi_score) + "\n")
重构音频代码
import numpy as np
import librosa
from scipy.io import wavfile
import os
import pywt
from work_photo import plot_time_domain_detail,plot_frequency_domain,plot_error_time_domain,plot_error_frequency_domain
def create_reconstruct_audio(denoised_features_list, sample_rates_list,clean_signals_list):
reconstructed_audios = []
# 使用逆MFCC算法重建音频信号
for i, denoised_features in enumerate(denoised_features_list):
sample_rate = sample_rates_list[i]
clean_signal = clean_signals_list[i]
print(f"特征数组 {i} 的形状: {denoised_features.shape}")
print("Clean Signal:")
print(clean_signal)
# 处理 NaN 值
denoised_features = denoised_features.astype(np.float64)
denoised_features = np.nan_to_num(denoised_features, nan=np.nanmean(denoised_features))
print(f"denoised_features:{denoised_features}")
print(f"denoised_features.shape:{denoised_features.shape}")
# 进行小波分解,得到系数数组序列
coeffs = pywt.wavedec(denoised_features, 'db6', level=2) # 这里假设使用 db4 小波基函数和3级分解
# 使用小波逆变换重构信号
reconstructed_signal = pywt.waverec(coeffs, 'db6') # 这里假设使用 db4 小波基函数
# 缩放音频数据到合适的范围,例如 [-1, 1]
reconstructed_signal = reconstructed_signal / np.max(np.abs(reconstructed_signal))
# 放大音频波形
amplification_factor = 1 # 可根据需要调整放大倍数
reconstructed_signal = reconstructed_signal * amplification_factor
# 将重构的音频数据转换为浮点数
reconstructed_signal = reconstructed_signal.astype(np.float64)
print(f"浮点数_reconstructed_audio:{reconstructed_signal}")
# 检查是否有 NaN 值
if np.isnan(reconstructed_signal).any():
print(f"存在 NaN 值,请检查重建过程。样本索引:{i}")
# print(f"Clean signal length: {len(clean_signal)}")
# print(f"Reconstructed signal length: {len(reconstructed_signal)}")
# 调整重建信号的长度与原始信号相匹配
if len(reconstructed_signal) != len(clean_signal):
reconstructed_signal = np.resize(reconstructed_signal, len(clean_signal))
# 假设每个音频信号的采样率一致,且为 sample_rate
reconstructed_signal = np.array(reconstructed_signal)
desired_length = sample_rate // 4 # 1/4秒的长度
if len(reconstructed_signal) < desired_length:
reconstructed_signal = np.pad(reconstructed_signal, (0, desired_length - len(reconstructed_signal)),
'constant')
# print(f"Clean signal length: {len(clean_signal)}")
# print(f"Reconstructed signal length: {len(reconstructed_signal)}")
# 显示音频波形
reconstructed_audios.append(reconstructed_signal)
clean_signal = np.array(clean_signal)
reconstructed_signal = np.array(reconstructed_signal)
# 计算干净信号的时间长度(单位:秒)
clean_signal_duration = len(clean_signal) / sample_rate
# 计算重建信号的时间长度(单位:秒)
reconstructed_signal_duration = len(reconstructed_signal) / sample_rate
# 输出时间长度
print(f"Clean signal duration: {clean_signal_duration:.2f} s")
print(f"Reconstructed signal duration: {reconstructed_signal_duration:.2f} s")
# 绘图
start_idx = int(sample_rate * 1) # 第1秒开始的样本索引
end_idx = int(sample_rate * 2) # 第2秒结束的样本索引
plot_time_domain_detail(clean_signal, reconstructed_signal, start_idx, end_idx)
plot_frequency_domain(clean_signal, reconstructed_signal,sample_rate)
plot_error_time_domain(clean_signal, reconstructed_signal)
plot_error_frequency_domain(clean_signal, reconstructed_signal,sample_rate)
# 保存音频文件
output_folder = 'data/output_audio_files'
# 创建文件夹(如果不存在)
if not os.path.exists(output_folder):
os.makedirs(output_folder, exist_ok=True)
# 保存音频文件到指定文件夹
for i, audio_data in enumerate(reconstructed_audios):
output_path = os.path.join(output_folder, f'reconstructed_audio_{i}.wav')
wavfile.write(output_path, sample_rates_list[i], audio_data)
return reconstructed_audios