VAE代码,模型跑通了,用evaluate函数输出了loss就可以了,但不知道如何根据loss和原始数据计算TP、FP、TN、FN这四个指标,再通过这4个指标来计算准确率、F-score这些数值,以及画出ROC曲线、计算AUC面积。使用的数据集为CICIoT2023,X_train为训练集,Y_test为测试集,Y_test_normal和Y_test_abnormal为测试集正常和异常部分。
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Lambda, Conv1D, Flatten, SpatialDropout1D, Reshape
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras import backend as K
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import pandas as pd
import time
# 读取数据
csv_path_train = 'CICIoT2023/CICIoT2023/benign.csv'
X_train = pd.read_csv(csv_path_train).values
X_train = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(X_train)))
X_train = np.reshape(X_train, (-1, 100, 46))
csv_path_test = 'CICIoT2023/CICIoT2023/ceshi.csv'
Y_test = pd.read_csv(csv_path_test)
Y_test_normal = Y_test[Y_test.label == 'BenignTraffic'].drop(labels='label', axis=1).values
Y_test_normal = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(Y_test_normal)))
Y_test_normal = np.reshape(Y_test_normal, (-1, 100, 46))
Y_test_abnormal = Y_test[Y_test.label != 'BenignTraffic'].drop(labels='label', axis=1).values
Y_test_abnormal = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(Y_test_abnormal)))
Y_test_abnormal = np.reshape(Y_test_abnormal, (-1, 100, 46))
original_dim = 46 # 特征维度
latent_dim = 2 # 潜在空间维度
intermediate_dim = 256
batch_size = 100
# 采样函数
def sampling(args):
z_mean, z_log_var = args
batch = K.shape(z_mean)[0]
dim = K.int_shape(z_mean)[1]
epsilon = K.random_normal(shape=(batch, dim))
return z_mean + K.exp(0.5 * z_log_var) * epsilon
# 编码器
inputs = Input(shape=(100, original_dim))
x = Conv1D(64, kernel_size=3, activation='relu', padding='same')(inputs)
x = Flatten()(x)
z_mean = Dense(latent_dim)(x)
z_log_var = Dense(latent_dim)(x)
z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
# 解码器
latent_inputs = Input(shape=(latent_dim,))
x = Dense(100 * 64, activation='relu')(latent_inputs)
x = Reshape((100, 64))(x) # 将展平的层重新调整为三维
x = Conv1D(64, kernel_size=3, activation='relu', padding='same')(x)
outputs = Conv1D(original_dim, kernel_size=3, activation='sigmoid', padding='same')(x)
decoder = Model(latent_inputs, outputs, name='decoder')
# VAE模型
outputs = decoder(encoder(inputs)[2]) # 连接编码器和解码器
vae = Model(inputs, outputs, name='vae')
# 定义损失函数
reconstruction_loss = tf.keras.losses.binary_crossentropy(K.flatten(inputs), K.flatten(outputs))
reconstruction_loss *= 100 # 对应维度
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.sum(kl_loss, axis=-1) * -0.5
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='adam')
test_first_d = Y_test_abnormal.shape[0]
print(f"test_normal:{Y_test_normal.shape[0]}")
print(f"test_abnormal:{Y_test_abnormal.shape[0]}")
# 训练VAE
startTime = time.time() # 开始时间
vae.fit(X_train, epochs=100, batch_size=batch_size, validation_data=(Y_test_normal, None))
losssum1=0
losssum2=0
for i in range(1,batch_size):
loss11=vae.evaluate(Y_test_normal)
losssum1=+loss11
loss22 = vae.evaluate(Y_test_abnormal)
losssum2=+loss22
print(f"loss1:{loss11}")
print(f"loss2:{loss22}")
endTime = time.time() #结束时间
print(f"Took {round((endTime - startTime), 5)} seconds to calculate.")
vae.summary()
print("Testing data shape:", Y_test_normal.shape)
result = vae.evaluate(Y_test_normal)
print("Evaluate result:", result)
loss1 = vae.evaluate(Y_test_normal)
loss2 = vae.evaluate(Y_test_abnormal)
print(f"loss1:{loss1}")
print(f"loss2:{loss2}")
# 假设 time_steps 应该等于 test_first_d
#input_data1 = np.expand_dims(np.ones((test_first_d, 1)), axis=2) # 在最后一个轴上增加一个维度
# 或者使用 reshape 方法
input_data1 = np.ones((test_first_d, 1, 1)).reshape((test_first_d, 1, 1))
acc_normal = vae.evaluate(input_data1)
#input_data2 = np.expand_dims(np.zeros((test_first_d, 1)), axis=2) # 在最后一个轴上增加一个维度
input_data2 = np.zeros((test_first_d, 1, 1)).reshape((test_first_d, 1, 1))
acc_abnormal = vae.evaluate(input_data2)
#_, acc_normal = vae.evaluate(Y_test_normal, np.ones((test_first_d, 1)))
#_, acc_abnormal = vae.evaluate(Y_test_abnormal, np.zeros((test_first_d, 1)))
#acc_normal = vae.evaluate(np.ones((test_first_d, 1)))
#acc_abnormal = vae.evaluate(np.zeros((test_first_d, 1)))
print(f"acc_normal:{acc_normal}")
print(f"acc_abnormal:{acc_abnormal}")
TP = acc_normal
TN = 1 - TP
FN = acc_abnormal
FP = 1 - FN
precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1 = 2 * (precision * recall) / (precision + recall)
endTime = time.time() # 结束时间
print(f"acc:{(acc_normal + acc_abnormal) * 50}%")
print(f"precision:{precision}")
print(f"recall:{recall}")
print(f"F1-score:{f1}")
打印结果和报错截图:



