做这个dbn是无监督学习效果很差,我给他加了标签,计算损失函数,但是反向传播维度不匹配,有佬知道怎么回事吗?
主要是激活函数这部分的问题,数据输入维度(3200,8)label(3200,1)
报错:
H=x @ self.w+self.a.T
ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 2 is different from 1)
完整代码:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from torchvision.datasets import MNIST
np.random.seed(2)
from tqdm import tqdm
import matplotlib.pyplot as plt
class RBM():
def __init__(self,x_layer,h_layer):
'''
:param x_num: 可见层维度
:param h_num: 隐藏层维度
'''
self.x_layer=x_layer #可见层的维度
self.h_layer=h_layer #隐藏层的维度
self.w=np.random.normal(0, 0.1, size=(self.x_layer, self.h_layer)) #从正态分布中随机采样w
self.a=np.random.normal(0, 0.1, size=(self.h_layer,1)) #从正态分布中随机采样a
self.b=np.random.normal(0, 0.1, size=(self.x_layer,1)) #从正态分布中随机采样b
self.learning_rate=0.1 #学习率
def train(self,x,K):
'''
:param x: 训练数据
:param K: 使用k次吉布斯采样
:return:
'''
x_num=x.shape[0] #样本的个数
for _ in tqdm(np.arange(100),desc="梯度上升"): #梯度上升迭代10000次
x0=x
#################
#CD-K吉布斯采样
for _ in np.arange(K): #吉布斯采样K次
P_h=self.sigmoid_Ph_x(x0) #从v0计算出P(h=1|v0)
#从P(h=1|v0)采样出h0
h0=np.random.binomial(1,p=P_h,size=(x_num,self.h_layer))
#计算出P(v|h0)
P_x=self.sigmoid_Px_h(h0)
#采样出v
x0=np.random.binomial(1,p=P_x,size=(x_num,self.x_layer))
#################
#真实数据的P(h=1|x)
true_h =self.sigmoid_Ph_x(x)
#采样数据的P(h=1|x)
x_sample_h=self.sigmoid_Ph_x(x0)
#w梯度
w_GD=(x.T@true_h-x0.T@x_sample_h)/x_num
#a梯度
a_GD=np.mean(true_h-x_sample_h,axis=0).reshape(-1,1)
#b梯度
b_GD=np.mean(x-x0,axis=0).reshape(-1,1)
#梯度下降
self.w+=self.learning_rate*w_GD
self.a+=self.learning_rate*a_GD
self.b+=self.learning_rate*b_GD
def sigmoid_Ph_x(self,x):
'''
计算P(h=1|x)
:param x: 数据
:return:
'''
print(x.shape)
print(self.w.shape)
H=x @ self.w+self.a.T
result=1/(1+np.exp(-H))
return result
def sigmoid_Px_h(self,h):
'''
计算P(x=1|h)
:param h:
:return:
'''
H=(self.w @ h.T + self.b).T
result=1/(1+np.exp(-H))
return result
# def sigmoid_Ph_x2(self,x):
# '''
# 计算P(h=1|x)
# :param x: 数据
# :return:
# '''
# print(x.shape)
# print(self.w.shape)
# H=x @ self.w.T+self.a.T
# result=1/(1+np.exp(-H))
# return result
#
# def sigmoid_Px_h2(self,h):
# '''
# 计算P(x=1|h)
# :param h:
# :return:
# '''
# H=(self.w @ h.T + self.b).T
# result=1/(1+np.exp(-H))
# return result
class DBN():
def __init__(self, layer):
self.layer = layer
layer_num = len(layer) # 计算有多少层
self.RBMS = [] # 储存多个受限玻尔兹曼机
for i in np.arange(layer_num - 1): # 迭代初始化多个受限玻尔兹曼机
rbm = RBM(layer[i], layer[i + 1])
self.RBMS.append(rbm)
def train(self, data, labels, k, num_epochs):
for epoch in range(num_epochs):
for rbm in self.RBMS: # 迭代训练每一个RBM
rbm.train(data, k) # 训练
p = rbm.sigmoid_Ph_x(data) # 计算出下一层的概率
data = np.random.binomial(1, p, size=p.shape) # 根据概率采样
x_train = data
y_train = labels
# 计算预测结果
x_train_pred = data # 直接使用数据作为预测结果
# 计算损失和梯度
loss = self.cross_entropy_loss(y_train, x_train_pred)
dW = np.zeros_like(self.RBMS[-1].w)
da = np.zeros_like(self.RBMS[-1].a)
db = np.zeros_like(self.RBMS[-1].b)
# 反向传播
for rbm in reversed(self.RBMS):
dy = (x_train_pred - y_train) * rbm.sigmoid_Px_h(x_train)
print(dy.shape)
# print( "ceshi:",rbm.sigmoid_Ph_x2(data))
dW += rbm.learning_rate * np.dot(dy.T, rbm.sigmoid_Ph_x(data))
da += rbm.learning_rate * np.mean(dy, axis=0)
db += rbm.learning_rate * np.mean(rbm.sigmoid_Ph_x(data) - x_train_pred, axis=0)
# 更新数据
x_train = rbm.sigmoid_Ph_x(x_train)
# 更新权重
self.RBMS[-1].w += dW
self.RBMS[-1].a += da
self.RBMS[-1].b += db
# 打印损失和准确率
print(
f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss}, Accuracy: {self.accuracy(y_train, x_train_pred)}")
def cross_entropy_loss(self, y_true, y_pred):
y_true = y_true.reshape(-1, 1)
y_pred = y_pred.reshape(-1, self.layer[-1])
loss = -np.mean(y_true * np.log(y_pred + 1e-7)) # 添加正则化以避免数值稳定性问题
return loss
def accuracy(self, y_true, y_pred):
y_true = y_true.reshape(-1, 1)
y_pred = np.argmax(y_pred, axis=1)
return np.mean(y_true == y_pred)
def predict(self, x):
x = x.astype(float)
for rbm in self.RBMS:
p = rbm.sigmoid_Ph_x(x)
x = np.random.binomial(1, p, size=p.shape)
x = np.array(x)
x_shape = x.shape
x = x.flatten()
x = self.RBMS[-1].softmax(x)
x = x.reshape(x_shape)
return x
if __name__ == '__main__':
k=2
data = pd.read_csv('merged.csv')
X = data.drop('type', axis=1).values
y = data['type'].values
y = y.reshape(-1, 1)
epochs = 10
print(X.shape)
print(y.shape)
# 数据预处理
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(X_train.shape)
print(X_test.shape)
dbn = DBN([8, 4, 2,1]) # 初始化,第一层1000神经元,第二场1000,以此类推
dbn.train(X_train, y_train, k, epochs) # 训练
result = dbn.predict(X_test) # 预测
# 打印预测结果
print(result)
# 计算准确率
accuracy = dbn.accuracy(y_test, result)
print(f"Accuracy: {accuracy}")
print(result)