SilyaSophie 2024-12-18 23:21 采纳率: 36.8%
浏览 59
已结题

GAN模型如何输出结果并画图

GAN代码,模型跑通了,计算出了准确率。但无法用evaluate函数输出loss,并根据loss和原始数据计算TP、FP、TN、FN这四个指标,再通过这4个指标来计算准确率、F-score这些数值,以及画出ROC曲线、计算AUC面积。使用的数据集为CICIoT2023,X_train为训练集,Y_test为测试集,Y_test_normal和Y_test_abnormal为测试集正常和异常部分。

from __future__ import print_function, division
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from keras.datasets import mnist
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Conv1D, GRU, Dropout, InputLayer, MaxPool1D, GlobalMaxPool1D
from tensorflow.keras.layers import BatchNormalization, Activation, ZeroPadding2D, Softmax
from keras.layers.convolutional import UpSampling2D, Conv2D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam

import matplotlib.pyplot as plt

import sys

import numpy as np
import pandas as pd
import time

class GAN:
    def __init__(self):
        self.seq_len = 100
        self.d_model = 46
        self.img_shape = (self.seq_len, self.d_model)


        optimizer = Adam(0.0002, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
                                   optimizer=optimizer,
                                   metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise as input and generates imgs
        z = Input(shape=self.img_shape)
        img = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        validity = self.discriminator(img)

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, validity)
        self.combined.compile(loss='mse', optimizer=optimizer)

    def build_generator(self):
        model = Sequential()
        model.add(Input(shape=self.img_shape))
        model.add(GRU(256, input_shape=self.img_shape, activation='relu'))
        # model.add(Dense(256, input_dim=self.latent_dim))
        # model.add(Dense(512, activation='relu'))
        # model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(128))
        model.add(Dense(128, activation='relu'))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(np.prod(self.img_shape), activation='tanh'))
        model.add(Reshape(self.img_shape))

        model.summary()

        #noise = Input(shape=(self.latent_dim, 1))
        noise = Input(shape=self.img_shape)
        # noise=Input(shape=self.img_shape)
        img = model(noise)

        return Model(noise, img)

    def build_discriminator(self):
        model = Sequential()
        model.add(Input(shape=self.img_shape))
        model.add(Conv1D(1024, kernel_size=3, strides=2, padding='same', activation='relu'))
        model.add(Dense(512))
        model.add(Dense(512, activation='relu'))
        model.add(Dense(256))
        model.add(Dense(64, activation='relu'))
        model.add(GlobalMaxPool1D())
        model.add(Dense(1, activation='sigmoid'))


        model.summary()
        img = Input(shape=self.img_shape)
        validity = model(img)
        return Model(img, validity)

    def train(self, X_train, epochs, batch_size=128, sample_interval=50):
        # Adversarial ground truths
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        print(f"valid:{valid.shape}")
        print(f"fake:{fake.shape}")

        for epoch in range(epochs):
            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random batch of images
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            imgs = X_train[idx]

            noise = np.random.normal(0, 1, (batch_size, self.seq_len, self.d_model))
            print(f"noise:{noise.shape}")
            # Generate a batch of new images
            gen_imgs = self.generator.predict(noise)



            # Train the discriminator
            d_loss_real = self.discriminator.train_on_batch(imgs, valid)
            d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            noise = np.random.normal(0, 1, (batch_size, self.seq_len, self.d_model))
            print(f"noise:{noise.shape}")
            # Train the generator (to have the discriminator label samples as valid)
            g_loss = self.combined.train_on_batch(noise, fake)

            # Plot the progress
            print("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100 * d_loss[1], g_loss))

            # # If at save interval => save generated image samples
            # if epoch % sample_interval == 0:
            #     self.sample_images(epoch)


if __name__ == '__main__':
    # Load the dataset
    # csv文件路径
    csv_path_train = 'CICIoT2023/CICIoT2023/benign.csv'
    # 读取数据
    X_train = pd.read_csv(csv_path_train)
    X_train = X_train.values
    X_train = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(X_train)))

    X_train = np.reshape(X_train, (-1,100,46))
    print(f"train:{X_train.shape}")
    idx = np.random.randint(0, X_train.shape[0], 16)
    imgs = X_train[idx]
    # print(imgs.shape)
    print(f"imgs:{imgs.shape}")
    # noise = np.random.normal(0, 1, (16, 100, 1))
    # # print(noise.shape)
    # print(f"noise:{noise.shape}")

    # csv文件路径
    csv_path_test = 'CICIoT2023/CICIoT2023/ceshi.csv'
    Y_test = pd.read_csv(csv_path_test)
    #Y_test_normal = Y_test[Y_test.label == 'BenignTraffic'].drop(labels='label', axis=1).values
    Y_test_abnormal = Y_test[Y_test.label != 'BenignTraffic'].drop(labels='label', axis=1).values
    Y_test_abnormal = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(Y_test_abnormal)))
    #Y_test_normal = np.reshape(Y_test_normal, (-1,100,46))
    Y_test_abnormal = np.reshape(Y_test_abnormal, (-1, 100, 46))


    test_first_d = Y_test_abnormal.shape[0]
    Y_test_normal =X_train[:test_first_d, :, :]
    #Y_test_normal = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(Y_test_normal)))
    print(f"test_normal:{Y_test_normal.shape}")
    print(f"test_abnormal:{Y_test_abnormal.shape}")

    batch_size = 16

    startTime = time.time()  # 开始时间
    gan = GAN()
    gan.train(X_train=X_train, epochs=100 , batch_size=batch_size, sample_interval=200)

    losssum1 = 0
    losssum2 = 0
    for i in range(1, batch_size):
        loss11 = gan.discriminator.evaluate(Y_test_normal)
        losssum1 = +loss11
        loss22 = gan.discriminator.evaluate(Y_test_abnormal)
        losssum2 = +loss22
        print(f"loss1:{loss11}")
        print(f"loss2:{loss22}")

    endTime = time.time()  # 结束时间

    _, acc_normal = gan.discriminator.evaluate(Y_test_normal, np.ones((test_first_d, 1)))
    _, acc_abnormal = gan.discriminator.evaluate(Y_test_abnormal, np.zeros((test_first_d, 1)))
    print(f"acc:{(acc_normal+acc_abnormal)*50}%")

from sklearn.metrics import roc_curve, auc

# 预测部分
def evaluate_gan(X_test, model):
    # 获取重构输出
    reconstructions = model.predict(X_test)
    # 计算重构误差
    reconstruction_errors = np.mean(np.abs(X_test - reconstructions), axis=(1, 2))
    return reconstruction_errors

# 计算阈值
def find_threshold(y_true, y_pred):
    fpr, tpr, thresholds = roc_curve(y_true, y_pred)
    # 选择使FPR和TPR之差最大的阈值
    optimal_idx = np.argmax(tpr - fpr)
    optimal_threshold = thresholds[optimal_idx]
    return optimal_threshold

# 获取重构误差
recon_errors_normal = evaluate_gan(Y_test_normal, gan.discriminator)
recon_errors_abnormal = evaluate_gan(Y_test_abnormal, gan.discriminator)

# 合并数据
recon_errors = np.concatenate([recon_errors_normal, recon_errors_abnormal])
labels = np.array([0]*len(recon_errors_normal) + [1]*len(recon_errors_abnormal))

# 找到最佳阈值
optimal_threshold = find_threshold(labels, recon_errors)

# 使用阈值进行二分类
predictions = (recon_errors > optimal_threshold).astype(int)

from sklearn.metrics import confusion_matrix  # 导入混淆矩阵计算函数
# 计算混淆矩阵
cm = confusion_matrix(labels, predictions)
TP = cm[1, 1]
TN = cm[0, 0]
FP = cm[0, 1]
FN = cm[1, 0]

# 计算评价指标
accuracy = (TP + TN) / (TP + TN + FP + FN)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1 = 2 * (precision * recall) / (precision + recall)

# 绘制ROC曲线并计算AUC
fpr, tpr, _ = roc_curve(labels, recon_errors)
roc_auc = auc(fpr, tpr)

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
print(f"AUC: {roc_auc}")

# 绘制ROC曲线
import matplotlib.pyplot as plt
plt.figure()
plt.plot(fpr, tpr, label=f'AUC = {roc_auc:.2f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()


报错如下:

img

img

  • 写回答

33条回答 默认 最新

  • 问答-莫得感情 2024-12-18 23:31
    关注
    获得2.25元问题酬金

    ️题主您好️,目前由我为您解答👇🏼
    🧡参考谷歌智能助手,由吾为汝整理提供🧡

    你的代码主要存在以下几个问题:

    1. evaluate函数使用错误: 你试图用gan.discriminator.evaluate(Y_test_normal)来获取loss。evaluate函数返回的是loss和metrics(在你的代码里是accuracy),而不是单独的loss。你应该这样使用:loss, acc = gan.discriminator.evaluate(Y_test_normal, np.ones((test_first_d, 1)))。注意,evaluate函数需要真实标签作为第二个参数。对于正常数据,标签应该是全1;对于异常数据,标签应该是全0。

    2. 循环中的evaluate: 在你的循环for i in range(1, batch_size):中,你反复调用evaluate,但这并没有意义。你应该在训练完成后,对整个测试集(正常和异常)各调用一次evaluate

    3. 重构误差计算错误: GAN的评估方式与自编码器不同,不直接计算重构误差。GAN的判别器输出的是数据是真或假的概率。你需要利用这个概率来进行异常检测。

    4. combined模型未使用: 你构建了combined模型,但在训练过程中只使用了generatordiscriminator,没有使用combined模型进行训练。这会导致生成器无法有效地学习。

    以下是修改后的代码,并附带详细解释:

    from __future__ import print_function, division
    import time
    import numpy as np
    import pandas as pd
    from sklearn.preprocessing import MinMaxScaler, StandardScaler
    from sklearn.metrics import roc_curve, auc, confusion_matrix
    import matplotlib.pyplot as plt
    from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Conv1D, GRU, Dropout, InputLayer, MaxPool1D, GlobalMaxPool1D
    from tensorflow.keras.layers import BatchNormalization, Activation, ZeroPadding2D, LeakyReLU
    from tensorflow.keras.models import Sequential, Model
    from tensorflow.keras.optimizers import Adam
    
    # ... (GAN class definition remains the same)
    
    if __name__ == '__main__':
        # ... (Data loading remains the same)
    
        gan = GAN()
        gan.train(X_train=X_train, epochs=100, batch_size=batch_size, sample_interval=200)
    
        # Evaluate the discriminator
        loss_normal, acc_normal = gan.discriminator.evaluate(Y_test_normal, np.ones((test_first_d, 1)))
        loss_abnormal, acc_abnormal = gan.discriminator.evaluate(Y_test_abnormal, np.zeros((test_first_d, 1)))
    
        # Get discriminator predictions (probabilities)
        predictions_normal = gan.discriminator.predict(Y_test_normal).flatten()
        predictions_abnormal = gan.discriminator.predict(Y_test_abnormal).flatten()
    
        # Combine predictions and true labels
        predictions = np.concatenate([predictions_normal, predictions_abnormal])
        labels = np.array([0] * len(predictions_normal) + [1] * len(predictions_abnormal))
    
        # Find optimal threshold using ROC curve
        fpr, tpr, thresholds = roc_curve(labels, predictions)
        optimal_idx = np.argmax(tpr - fpr)
        optimal_threshold = thresholds[optimal_idx]
    
        # Classify based on the threshold
        binary_predictions = (predictions > optimal_threshold).astype(int)
    
        # Calculate confusion matrix and metrics
        cm = confusion_matrix(labels, binary_predictions)
        TP = cm[1, 1]
        TN = cm[0, 0]
        FP = cm[0, 1]
        FN = cm[1, 0]
    
        accuracy = (TP + TN) / (TP + TN + FP + FN)
        precision = TP / (TP + FP) if (TP + FP) != 0 else 0  # Handle potential division by zero
        recall = TP / (TP + FN) if (TP + FN) != 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) != 0 else 0
        roc_auc = auc(fpr, tpr)
    
        # ... (Print metrics and plot ROC curve - this part remains the same)
    
    
    

    关键修改:

    • 使用discriminator.predict获取判别器的输出概率。
    • 使用判别器的输出概率,而不是重构误差,来进行异常检测。
    • 修正了evaluate函数的使用,并移除了不必要的循环。
    • 添加了防止除以零的处理。

    其他建议:

    • 激活函数: 考虑在生成器的输出层使用tanh激活函数,并将输入数据缩放到[-1, 1]的范围。
    • LeakyReLU: 在判别器中使用LeakyReLU代替ReLU可能会有更好的效果。
    • 训练combined模型: 确保在训练循环中也训练combined模型,这对于GAN的性能至关重要。 你需要在训练循环中添加g_loss = self.combined.train_on_batch(noise, valid),其中valid是全1数组,表示希望生成器生成的样本被判别器判定为真实样本。

    通过以上修改,你的代码应该可以正确运行并输出评估指标和ROC曲线。记住,GAN的训练可能比较棘手,需要仔细调整参数和网络结构才能获得良好的结果.

    评论

报告相同问题?

问题事件

  • 系统已结题 12月26日
  • 赞助了问题酬金15元 12月18日
  • 创建了问题 12月18日