GAN代码,模型跑通了,计算出了准确率。但无法用evaluate函数输出loss,并根据loss和原始数据计算TP、FP、TN、FN这四个指标,再通过这4个指标来计算准确率、F-score这些数值,以及画出ROC曲线、计算AUC面积。使用的数据集为CICIoT2023,X_train为训练集,Y_test为测试集,Y_test_normal和Y_test_abnormal为测试集正常和异常部分。
from __future__ import print_function, division
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from keras.datasets import mnist
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Conv1D, GRU, Dropout, InputLayer, MaxPool1D, GlobalMaxPool1D
from tensorflow.keras.layers import BatchNormalization, Activation, ZeroPadding2D, Softmax
from keras.layers.convolutional import UpSampling2D, Conv2D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import sys
import numpy as np
import pandas as pd
import time
class GAN:
def __init__(self):
self.seq_len = 100
self.d_model = 46
self.img_shape = (self.seq_len, self.d_model)
optimizer = Adam(0.0002, 0.5)
# Build and compile the discriminator
self.discriminator = self.build_discriminator()
self.discriminator.compile(loss='binary_crossentropy',
optimizer=optimizer,
metrics=['accuracy'])
# Build the generator
self.generator = self.build_generator()
# The generator takes noise as input and generates imgs
z = Input(shape=self.img_shape)
img = self.generator(z)
# For the combined model we will only train the generator
self.discriminator.trainable = False
# The discriminator takes generated images as input and determines validity
validity = self.discriminator(img)
# The combined model (stacked generator and discriminator)
# Trains the generator to fool the discriminator
self.combined = Model(z, validity)
self.combined.compile(loss='mse', optimizer=optimizer)
def build_generator(self):
model = Sequential()
model.add(Input(shape=self.img_shape))
model.add(GRU(256, input_shape=self.img_shape, activation='relu'))
# model.add(Dense(256, input_dim=self.latent_dim))
# model.add(Dense(512, activation='relu'))
# model.add(BatchNormalization(momentum=0.8))
model.add(Dense(128))
model.add(Dense(128, activation='relu'))
model.add(BatchNormalization(momentum=0.8))
model.add(Dense(np.prod(self.img_shape), activation='tanh'))
model.add(Reshape(self.img_shape))
model.summary()
#noise = Input(shape=(self.latent_dim, 1))
noise = Input(shape=self.img_shape)
# noise=Input(shape=self.img_shape)
img = model(noise)
return Model(noise, img)
def build_discriminator(self):
model = Sequential()
model.add(Input(shape=self.img_shape))
model.add(Conv1D(1024, kernel_size=3, strides=2, padding='same', activation='relu'))
model.add(Dense(512))
model.add(Dense(512, activation='relu'))
model.add(Dense(256))
model.add(Dense(64, activation='relu'))
model.add(GlobalMaxPool1D())
model.add(Dense(1, activation='sigmoid'))
model.summary()
img = Input(shape=self.img_shape)
validity = model(img)
return Model(img, validity)
def train(self, X_train, epochs, batch_size=128, sample_interval=50):
# Adversarial ground truths
valid = np.ones((batch_size, 1))
fake = np.zeros((batch_size, 1))
print(f"valid:{valid.shape}")
print(f"fake:{fake.shape}")
for epoch in range(epochs):
# ---------------------
# Train Discriminator
# ---------------------
# Select a random batch of images
idx = np.random.randint(0, X_train.shape[0], batch_size)
imgs = X_train[idx]
noise = np.random.normal(0, 1, (batch_size, self.seq_len, self.d_model))
print(f"noise:{noise.shape}")
# Generate a batch of new images
gen_imgs = self.generator.predict(noise)
# Train the discriminator
d_loss_real = self.discriminator.train_on_batch(imgs, valid)
d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
# ---------------------
# Train Generator
# ---------------------
noise = np.random.normal(0, 1, (batch_size, self.seq_len, self.d_model))
print(f"noise:{noise.shape}")
# Train the generator (to have the discriminator label samples as valid)
g_loss = self.combined.train_on_batch(noise, fake)
# Plot the progress
print("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100 * d_loss[1], g_loss))
# # If at save interval => save generated image samples
# if epoch % sample_interval == 0:
# self.sample_images(epoch)
if __name__ == '__main__':
# Load the dataset
# csv文件路径
csv_path_train = 'CICIoT2023/CICIoT2023/benign.csv'
# 读取数据
X_train = pd.read_csv(csv_path_train)
X_train = X_train.values
X_train = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(X_train)))
X_train = np.reshape(X_train, (-1,100,46))
print(f"train:{X_train.shape}")
idx = np.random.randint(0, X_train.shape[0], 16)
imgs = X_train[idx]
# print(imgs.shape)
print(f"imgs:{imgs.shape}")
# noise = np.random.normal(0, 1, (16, 100, 1))
# # print(noise.shape)
# print(f"noise:{noise.shape}")
# csv文件路径
csv_path_test = 'CICIoT2023/CICIoT2023/ceshi.csv'
Y_test = pd.read_csv(csv_path_test)
#Y_test_normal = Y_test[Y_test.label == 'BenignTraffic'].drop(labels='label', axis=1).values
Y_test_abnormal = Y_test[Y_test.label != 'BenignTraffic'].drop(labels='label', axis=1).values
Y_test_abnormal = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(Y_test_abnormal)))
#Y_test_normal = np.reshape(Y_test_normal, (-1,100,46))
Y_test_abnormal = np.reshape(Y_test_abnormal, (-1, 100, 46))
test_first_d = Y_test_abnormal.shape[0]
Y_test_normal =X_train[:test_first_d, :, :]
#Y_test_normal = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(Y_test_normal)))
print(f"test_normal:{Y_test_normal.shape}")
print(f"test_abnormal:{Y_test_abnormal.shape}")
batch_size = 16
startTime = time.time() # 开始时间
gan = GAN()
gan.train(X_train=X_train, epochs=100 , batch_size=batch_size, sample_interval=200)
losssum1 = 0
losssum2 = 0
for i in range(1, batch_size):
loss11 = gan.discriminator.evaluate(Y_test_normal)
losssum1 = +loss11
loss22 = gan.discriminator.evaluate(Y_test_abnormal)
losssum2 = +loss22
print(f"loss1:{loss11}")
print(f"loss2:{loss22}")
endTime = time.time() # 结束时间
_, acc_normal = gan.discriminator.evaluate(Y_test_normal, np.ones((test_first_d, 1)))
_, acc_abnormal = gan.discriminator.evaluate(Y_test_abnormal, np.zeros((test_first_d, 1)))
print(f"acc:{(acc_normal+acc_abnormal)*50}%")
from sklearn.metrics import roc_curve, auc
# 预测部分
def evaluate_gan(X_test, model):
# 获取重构输出
reconstructions = model.predict(X_test)
# 计算重构误差
reconstruction_errors = np.mean(np.abs(X_test - reconstructions), axis=(1, 2))
return reconstruction_errors
# 计算阈值
def find_threshold(y_true, y_pred):
fpr, tpr, thresholds = roc_curve(y_true, y_pred)
# 选择使FPR和TPR之差最大的阈值
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold
# 获取重构误差
recon_errors_normal = evaluate_gan(Y_test_normal, gan.discriminator)
recon_errors_abnormal = evaluate_gan(Y_test_abnormal, gan.discriminator)
# 合并数据
recon_errors = np.concatenate([recon_errors_normal, recon_errors_abnormal])
labels = np.array([0]*len(recon_errors_normal) + [1]*len(recon_errors_abnormal))
# 找到最佳阈值
optimal_threshold = find_threshold(labels, recon_errors)
# 使用阈值进行二分类
predictions = (recon_errors > optimal_threshold).astype(int)
from sklearn.metrics import confusion_matrix # 导入混淆矩阵计算函数
# 计算混淆矩阵
cm = confusion_matrix(labels, predictions)
TP = cm[1, 1]
TN = cm[0, 0]
FP = cm[0, 1]
FN = cm[1, 0]
# 计算评价指标
accuracy = (TP + TN) / (TP + TN + FP + FN)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1 = 2 * (precision * recall) / (precision + recall)
# 绘制ROC曲线并计算AUC
fpr, tpr, _ = roc_curve(labels, recon_errors)
roc_auc = auc(fpr, tpr)
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
print(f"AUC: {roc_auc}")
# 绘制ROC曲线
import matplotlib.pyplot as plt
plt.figure()
plt.plot(fpr, tpr, label=f'AUC = {roc_auc:.2f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()
报错如下:

