学生练手,利用RCNN+LSTM模型,训练处理SEED数据集,我的代码运行后,所有样本预测结果都一样是怎么回事,是我数据没成功加载对吗?
import os
import scipy.io as sio
import numpy as np
import pandas as pd
from keras.models import Model
import tensorflow as tf
from keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, GlobalAveragePooling2D, LSTM, Dense, TimeDistributed, Reshape
from keras.models import Model, Sequential
from keras.initializers import GlorotUniform
from RCL_keras import RCL
from keras.optimizers import Adam
# 全局参数
sampling_rate = 200 # 采样率 200 Hz
segment_length = 1 # 每个片段的时长(秒)
timesteps = sampling_rate * segment_length # 每个片段的时间步长
num_channels = 62 # 通道数
num_classes = 3 # 类别数
K = 96 # RCNN 的通道数
batch_size = 32 # 批处理大小
epochs = 5 # 训练轮数
initial_learning_rate = 0.001 # 初始学习率
num_datasets = 1 # 数据集总数
samples_per_dataset = 15 # 每个数据集的样本数
fixed_num_segments = 10 # 每个样本固定分为5段
fixed_total_timesteps = timesteps * fixed_num_segments # 总时间步长=200*5=1000
# 模型定义
def create_rcnn_feature_extractor(input_shape, K=26):
model = Sequential()
model.add(Conv2D(K, kernel_size=3, padding='same', kernel_initializer='he_normal', use_bias=False, input_shape=input_shape))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(3, 3), strides=2, padding='same'))
model.add(RCL(K, steps=2))
model.add(RCL(K, steps=2))
model.add(MaxPooling2D(pool_size=(3, 3), strides=2, padding='same'))
model.add(RCL(K, steps=2))
model.add(RCL(K, steps=2))
model.add(MaxPooling2D(pool_size=(3, 3), strides=2, padding='same'))
model.add(RCL(K, steps=2))
model.add(RCL(K, steps=2))
model.add(GlobalAveragePooling2D())
return model
def create_lstm_model(feature_dim, num_classes, timesteps):
inputs = Input(shape=(timesteps, feature_dim))
lstm = LSTM(50, return_sequences=False)(inputs)
output = Dense(num_classes, activation='softmax')(lstm)
model = Model(inputs=inputs, outputs=output)
return model
# def create_combined_model(input_shape, num_classes, K=96, timesteps=5):
# inputs = Input(shape=(timesteps,) + input_shape) # 输入形状为(分段数, 62, 200, 1)
# feature_extractor = create_rcnn_feature_extractor(input_shape, K)
# feature_dim = K # RCNN 输出的特征维度
# lstm_model = create_lstm_model(feature_dim, num_classes, timesteps)
# inputs = Input(shape=(timesteps,) + input_shape)
# features = TimeDistributed(feature_extractor)(inputs)
# features = Reshape((timesteps, feature_dim))(features)
# outputs = lstm_model(features)
# model = Model(inputs=inputs, outputs=outputs)
# return model
# 模型定义需确保输入形状匹配
def create_combined_model(input_shape, num_classes, K=96, num_segments=5):
feature_extractor = create_rcnn_feature_extractor(input_shape, K)
lstm_model = create_lstm_model(feature_dim=K, num_classes=num_classes, timesteps=num_segments)
inputs = Input(shape=(num_segments,) + input_shape) # (None, 5, 62, 200, 1)
features = TimeDistributed(feature_extractor)(inputs) # 输出 (None, 5, K)
outputs = lstm_model(features)
model = Model(inputs=inputs, outputs=outputs)
return model
# 初始化模型时指定分段数
combined_model = create_combined_model(
input_shape=(62, 200, 1),
num_classes=num_classes,
K=K,
num_segments=fixed_total_timesteps // timesteps # 5
)
# 初始化模型
input_shape = (62, 2000, 1) # 单个分段的形状
combined_model = create_combined_model(input_shape, num_classes, K, num_segments=fixed_num_segments)
optimizer = Adam(learning_rate=initial_learning_rate)
combined_model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# def load_dataset(data_path, labels_path):
# data = sio.loadmat(data_path)
# all_samples = [data[f'jl_eeg{i+1}'][:, :fixed_total_timesteps] for i in range(samples_per_dataset)] #使用各自的命名方式时可以选择这样子读取,需要更改变量名
# eeg_data = np.stack(all_samples, axis=0) # 形状 (15, 62, 1000)
# labels = sio.loadmat(labels_path)['label'].flatten()
# return eeg_data, labels
import re
# 修改load_dataset函数中的截取部分,取5秒后
def load_dataset(data_path, labels_path):
data = sio.loadmat(data_path)
eeg_vars = []
for var_name in data.keys():
if var_name.startswith('__'):
continue
match = re.search(r'eeg(\d+)$', var_name)
if match:
eeg_number = int(match.group(1))
eeg_vars.append((eeg_number, var_name))
eeg_vars.sort(key=lambda x: x[0])
if len(eeg_vars) < samples_per_dataset:
raise ValueError(f"数据集{data_path}中EEG变量不足15个")
selected_vars = [var for (num, var) in eeg_vars[:samples_per_dataset]]
all_samples = []
for var_name in selected_vars:
# 修改点:从第5秒开始截取(5*200=1000)
start_index = 5 * sampling_rate # 1000
end_index = start_index + fixed_total_timesteps
sample = data[var_name][:, start_index:end_index] # 截取后5秒数据
all_samples.append(sample)
eeg_data = np.stack(all_samples, axis=0) # 形状 (15, 62, 1000)
labels = sio.loadmat(labels_path)['label'].flatten()
return eeg_data, labels
def preprocess_data(eeg_data, labels):
num_samples = eeg_data.shape[0]
num_segments = fixed_num_segments # 使用固定分段数
# 初始化数组保存分段数据
segmented_data = np.zeros((num_samples, num_segments, timesteps, num_channels))
for sample_idx in range(num_samples):
for seg_idx in range(num_segments):
start = seg_idx * timesteps
end = start + timesteps
segment = eeg_data[sample_idx, :, start:end].T # 转置为 (200, 62)
segmented_data[sample_idx, seg_idx] = segment
# 归一化处理
for sample_idx in range(num_samples):
for ch in range(num_channels):
ch_data = segmented_data[sample_idx, :, :, ch]
mean, std = np.mean(ch_data), np.std(ch_data)
segmented_data[sample_idx, :, :, ch] = (ch_data - mean) / (std + 1e-8)
# 调整形状: (样本数, 分段数, 62, 200, 1)
X_train = segmented_data.transpose(0, 1, 3, 2)[..., np.newaxis]
return X_train, labels
import matplotlib.pyplot as plt
def plot_sample(X, y, sample_idx=0):
plt.figure(figsize=(12, 6))
for ch in range(3): # 显示前3个通道
plt.subplot(3, 1, ch+1)
plt.plot(X[sample_idx, 0, ch, :, 0]) # 显示第一个分段的通道数据
plt.title(f"Label: {y[sample_idx]}, Channel: {ch}")
plt.tight_layout()
plt.show()
# 数据集路径模板
data_dir = 'C:/Users/小董喜欢花花/Desktop/代码/ddd/'
data_template = f'{data_dir}/{{}}_1.mat' # 数据文件,例如 1_1,2_1,3_1,1_2,2_2,3_2.mat 前边是第一组第二组第三组,后边是第一个人第二个人,由于在读取不同命名格式文件时需要更复杂的读取方式,这里暂时使用统一命名方式
#data_template = f'{data_dir}/{{}}_2.mat' #这里是更换了不同命名方式,但不是全部读取
labels_template = f'{data_dir}/label.mat' # 标签文件,例如 labels_1.mat,如果是不同标签可以使用上一行的方式读取,但在这里是一样的标签
from keras.callbacks import Callback
#训练过程
def main():
# 初始化模型
input_shape = (62, 200, 1)
combined_model = create_combined_model(input_shape, num_classes, K, num_segments=fixed_num_segments)
optimizer = Adam(learning_rate=initial_learning_rate)
combined_model.compile(optimizer=optimizer,
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# 训练所有数据集
for dataset_idx in range(1, num_datasets + 1):
# 加载训练数据
data_path = data_template.format(dataset_idx)
labels_path = labels_template.format(dataset_idx)
eeg_data, labels = load_dataset(data_path, labels_path)
# 预处理并训练
X_train, y_train = preprocess_data(eeg_data, labels)
# plot_sample(X_train, y_train)
class PredictionMonitor(Callback):
def on_epoch_end(self, epoch, logs=None):
# 随机选择5个训练样本显示预测分布
sample_indices = np.random.choice(len(X_train), 5)
predictions = self.model.predict(X_train[sample_indices])
print("\n预测样例分布:")
for i, pred in enumerate(predictions):
print(f"样本 {i+1}: {np.round(pred, 2)}")
print(f"Training on dataset {dataset_idx}...")
combined_model.fit(X_train, y_train,
epochs=epochs,
batch_size=batch_size,
verbose=1,callbacks=[PredictionMonitor()])
history = combined_model.fit(X_train, y_train,
epochs=epochs,
batch_size=batch_size,
verbose=1)
print(f"数据集 {dataset_idx} 训练结果:")
print(f"- 最后损失值: {history.history['loss'][-1]:.4f}")
print(f"- 最后准确率: {history.history['accuracy'][-1]:.4f}")
# ========== 测试阶段 ==========
# 加载测试集(假设测试集为7_1.mat)
test_data_path = os.path.join(data_dir, "7_1.mat") # 修改为实际测试集路径
test_labels_path = os.path.join(data_dir, "label.mat") # 修改为测试标签路径
# 加载并预处理测试数据
test_eeg, test_labels = load_dataset(test_data_path, test_labels_path)
X_test, y_test = preprocess_data(test_eeg, test_labels)
# 进行预测
predictions = combined_model.predict(X_test)
predicted_classes = np.argmax(predictions, axis=1)
confidences = np.max(predictions, axis=1)
# 保存结果
save_results(test_labels, predicted_classes, confidences, predictions)
def save_results(true_labels, pred_classes, confidences, prob_matrix):
"""保存预测结果到CSV文件"""
results = []
for i in range(len(true_labels)):
results.append({
"SampleID": i+1,
"TrueLabel": int(true_labels[i]),
"PredictedLabel": int(pred_classes[i]),
"Confidence": float(confidences[i]),
"Prob_Class0": float(prob_matrix[i][0]),
"Prob_Class1": float(prob_matrix[i][1]),
"Prob_Class2": float(prob_matrix[i][2])
})
df = pd.DataFrame(results)
# 创建结果目录
result_dir = os.path.join(os.path.dirname(__file__), "prediction_results")
os.makedirs(result_dir, exist_ok=True)
# 保存文件
csv_path = os.path.join(result_dir, "predictions.csv")
df.to_csv(csv_path, index=False)
print(f"预测结果已保存至:{csv_path}")
# 可选:保存为MAT文件
mat_path = os.path.join(result_dir, "predictions.mat")
sio.savemat(mat_path, {
'true_labels': true_labels,
'predicted_labels': pred_classes,
'confidence': confidences,
'prob_matrix': prob_matrix
})
print(f"MAT格式结果已保存至:{mat_path}")
if __name__ == "__main__":
main()