python代码,VAE架构加入了CNN,处理物联网2023数据集,decoder维度有问题,输出一直报错。
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Lambda,Conv1D,Flatten,Activation, SpatialDropout1D,Reshape
from tensorflow.keras.models import Model,Sequential
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import numpy as np
import pandas as pd
import time
# Load the dataset
# csv文件路径
csv_path_train = 'CICIoT2023/CICIoT2023/benign.csv'
# 读取数据
X_train = pd.read_csv(csv_path_train)
X_train = X_train.values
X_train = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(X_train)))
X_train = np.reshape(X_train, (-1, 100, 46))
print(f"train:{X_train.shape}")
idx = np.random.randint(0, X_train.shape[0], 16)
imgs = X_train[idx]
# print(imgs.shape)
print(f"imgs:{imgs.shape}")
# noise = np.random.normal(0, 1, (16, 100, 1))
# # print(noise.shape)
# print(f"noise:{noise.shape}")
# csv文件路径
csv_path_test = 'CICIoT2023/CICIoT2023/ceshi.csv'
Y_test = pd.read_csv(csv_path_test)
Y_test_normal = Y_test[Y_test.label == 'BenignTraffic'].drop(labels='label', axis=1).values
Y_test_normal = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(Y_test_normal)))
Y_test_abnormal = Y_test[Y_test.label != 'BenignTraffic'].drop(labels='label', axis=1).values
Y_test_abnormal = np.nan_to_num(MinMaxScaler().fit_transform(StandardScaler().fit_transform(Y_test_abnormal)))
Y_test_normal = np.reshape(Y_test_normal, (-1, 100, 46))
Y_test_abnormal = np.reshape(Y_test_abnormal, (-1, 100, 46))
# Define VAE architecture
#original_dim = X_train.shape[1]
#latent_dim = 2
batch_size=100
#original_dim=784
original_dim=46
latent_dim=2
intermediate_dim = 256
epochs=50
def sampling(args):
z_mean, z_log_var = args
print(f"z_mean:{z_mean.shape}")
batch = K.shape(z_mean)[0]
dim = K.int_shape(z_mean)[1]
epsilon = K.random_normal(shape=(batch, dim, 1))
return z_mean + K.exp(0.5 * z_log_var) * epsilon
#epsilon_reshaped = tf.reshape(epsilon, [-1, 392, 512])
#return z_mean + K.exp(0.5 * z_log_var) * epsilon_reshaped
inputs = Input(shape=(original_dim,100))# 这里应该是原始数据的维度
#h_inputs= Input(shape=(batch_size,original_dim))
h = Conv1D(1024, kernel_size=3, strides=2, padding='same', activation='relu')(inputs)#卷积层定义
# 均值输出层,输出的维度应与潜在空间的维度匹配
z_mean=Dense(512, activation='relu')(h)
# 对数方差输出层,同上
z_log_var=Dense(512, activation='relu')(h)
# 采样层,生成潜在空间的样本
z = Lambda(sampling)([z_mean, z_log_var])
#def tcn_layer(x1, dilation_rate):
x1 = Conv1D(filters=64, kernel_size=2, dilation_rate=2, padding='causal')(inputs)
#x1 = Activation('relu')(x1)
#x1 = SpatialDropout1D(0.2)(x1)
#return x1
#encoder = Model(inputs, [z_mean, z_log_var,z], name='encoder')
def build_encoder():
model=Sequential()
model.add(inputs)
model.add(Conv1D(filters=64, kernel_size=2, activation='relu'))
model.add(SpatialDropout1D(0.2))
model.add(Flatten())
model.add(Dense(2, activation='softmax'))
#model.outputs=outputs
model.summary()
return model
encoder=build_encoder()
latent_inputs = Input(shape=(latent_dim,100))
x = Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = Dense(original_dim, activation='sigmoid')(x)
#decoder = Model(latent_inputs, outputs, name='decoder')
def build_decoder():
model = Sequential()
model.add(latent_inputs)
model.add(Conv1D(filters=64, kernel_size=2, activation='relu'))
model.add(SpatialDropout1D(0.2))
model.add(Dense(784, activation='sigmoid')) # 添加一个全连接层,输出维度为 784
model.add(Dense(units=2 * 784, activation='sigmoid'))
model.add(Flatten())
model.add(Reshape((1, 1)))
model.add(Dense(2, activation='softmax'))
model.summary()
return model
decoder=build_decoder()
print(f"encoder(inputs):{encoder(inputs)}")
encoded_output = encoder(inputs)# 首先,获取编码器的输出
print(encoded_output.shape) # 这将显示编码器的输出形状
# 假设 encoded_output 是你的二维张量
encoded_output = tf.expand_dims(encoded_output, axis=-1)# 或者 axis=1,取决于你的需求
#encoded_output.layers[0] = encoded_output.reshape(2,100)
encoded_output = tf.reshape(encoded_output, [2, 100, 100])
outputs = decoder(encoded_output)#再获取译码器的输出
vae = Model(inputs, outputs, name='vae')
# Define VAE loss
reconstruction_loss = tf.keras.losses.binary_crossentropy(inputs, outputs)
reconstruction_loss *= original_dim
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.sum(kl_loss, axis=-1)
kl_loss *= -0.5
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='adam')
# Train VAE
vae.fit(X_train, epochs=10, batch_size=32, validation_data=(Y_test))
打印模型信息如下:


报错如下:

