感觉测试效果很差不知道是怎么回事
希望指点一波
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 忽略低级别警告
from sklearn.metrics import confusion_matrix
import tensorflow as tf
from matplotlib import pyplot as plt
import numpy as np
from scipy.fftpack import fft
import time
now_time = time.time()
def yuchuli(data, label):
a = np.loadtxt(data, dtype=np.float32)
a = a[0:12000]
a = a.reshape(30, 400)
# 转换成频域数据
data1 = fft(a).T # 快速傅里叶变换
xf1 = abs(data1) / len(data1) # 归一化处理
xf2 = xf1[range(int(len(data1) / 2))] # 取前面一半
data1 = xf2
data1 = tf.cast(data1, dtype=tf.float32)
data1 = tf.transpose(data1) # 矩阵转置
# 标签转换成独热编码
lab1 = np.array([label for i in range(0, 30)])
lab1 = tf.one_hot(lab1, depth=6)
return data1, lab1
def load_csv():
# 训练任务类别
data1, lab1 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\normal_x098_de.csv', 0)
data2, lab2 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\b07_de.csv', 1)
data3, lab3 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\b14_de.csv', 2)
data4, lab4 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\b21_de.csv', 3)
data5, lab5 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\ir07_de.csv', 4)
data6, lab6 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\ir14_de.csv', 5)
# 测试任务类别
data7, lab7 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\ir21_de.csv', 0)
data8, lab8 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\or07@6_de.csv', 1)
data9, lab9 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\or14@6_de.csv', 2)
data10, lab10 = yuchuli('C:\\Users\\pc.000\\Desktop\\数据集\\轴承数据十分类\\or21@6_de.csv', 3)
return data1, lab1, data2, lab2, data3, lab3, data4, lab4, data5, lab5, data6, lab6, data7, lab7, data8, lab8, data9, lab9, data10, lab10
# 加载数据
data1, lab1, data2, lab2, data3, lab3, data4, lab4, data5, lab5, data6, lab6, data7, lab7, data8, lab8, data9, lab9, data10, lab10 = load_csv()
lab = (lab1, lab2, lab3, lab4, lab5, lab6)
data = (data1, data2, data3, data4, data5, data6)
m_arr = np.array(lab)
n_arr = np.array(data)
# 定义抽样本构成分布的函数
def qushu(d, l): # d: data数据 l: label标签
d_s = [] # d_s: data_support 支持集数据
l_s = [] # l_s: label_support 支持集标签
suo1 = np.random.choice(30, 20, replace=False) # 30个数字取10个,False不可以重复
for i1 in range(len(suo1)):
index1 = suo1[i1]
d_s.append(d[index1]) # 30个样本随机抽取20个
l_s.append(l[index1]) # 30个样本随机抽取20个
t_d_s = d_s[0:5] # t_d_s = task_data_support 支持集数据 20个样本抽5个,5×400
t_l_s = l_s[0:5] # t_d_s = task_label_support 支持集标签 5×4
t_d_q = d_s[5:] # t_d_s = task_data_query 查询集数据 20个样本抽5个,剩余的15个样本,15×400
t_l_q = l_s[5:] # t_d_s = task_label_query 查询集标签 15×4
t_d_s = np.array(t_d_s)
t_l_s = np.array(t_l_s)
t_d_q = np.array(t_d_q)
t_l_q = np.array(t_l_q)
return t_d_s, t_l_s, t_d_q, t_l_q
"""构造训练任务分布,抽一百次,有一百个训练任务,每个任务的支持集样本数:4×5=20;查询集样本数:4×15=60,
每个任务的支持集维度:20×400;查询集维度:60×400;"""
x_s = []
y_s = []
x_q = []
y_q = []
for i in range(100):
suo = np.random.choice(6, 4, replace=False)
lab_1 = m_arr[suo[0], :]
data_1 = n_arr[suo[0], :]
lab_2 = m_arr[suo[1], :]
data_2 = n_arr[suo[1], :]
lab_3 = m_arr[suo[2], :]
data_3 = n_arr[suo[2], :]
lab_4 = m_arr[suo[3], :]
data_4 = n_arr[suo[3], :]
"""data1: 30×400 label: 30×4 t_d_s0: 5×400; t_l_s0: 5×4; t_d_s0: 5×400; t_l_q0: 5×5"""
t_d_s0, t_l_s0, t_d_q0, t_l_q0 = qushu(data_1, lab_1)
t_d_s1, t_l_s1, t_d_q1, t_l_q1 = qushu(data_2, lab_2)
t_d_s2, t_l_s2, t_d_q2, t_l_q2 = qushu(data_3, lab_3)
t_d_s3, t_l_s3, t_d_q3, t_l_q3 = qushu(data_4, lab_4)
x1_s = np.vstack((t_d_s0, t_d_s1, t_d_s2, t_d_s3)) # 20×400 4-way 5-shot 15-query
y1_s = np.vstack((t_l_s0, t_l_s1, t_l_s2, t_l_s3)) # 20×4
x1_q = np.vstack((t_d_q0, t_d_q1, t_d_q2, t_d_q3)) # 60×400
y1_q = np.vstack((t_l_q0, t_l_q1, t_l_q2, t_l_q3)) # 60×4
x_s.append(x1_s)
y_s.append(y1_s)
x_q.append(x1_q)
y_q.append(y1_q)
"""x_s是一个列表,x_q[i]是抽取的第i个任务"""
# 初始化参数,3层bp
def init_fen(n1, n_h1, n_h2, n_h3, n2):
# 截断式正态分布就是在标准正态分(高斯分布)布的基础上加以限制,以使得生成的数据在一定范围上。
# 如:标准正态分布生生成的数据在负无穷到正无穷,但是截断式正态分布生成的数据在(均值-2倍的标准差,均值+2倍的标准差)这个范围内,生成的数比较靠经。
w1 = tf.Variable(tf.random.truncated_normal([n1, n_h1], stddev=1. / tf.sqrt(float(n1))))
b1 = tf.Variable(tf.zeros([n_h1]))
w2 = tf.Variable(tf.random.truncated_normal([n_h1, n_h2], stddev=1. / tf.sqrt(float(n_h1))))
b2 = tf.Variable(tf.zeros([n_h2]))
w3 = tf.Variable(tf.random.truncated_normal([n_h2, n_h3], stddev=1. / tf.sqrt(float(n_h2))))
b3 = tf.Variable(tf.zeros([n_h3]))
w4 = tf.Variable(tf.random.truncated_normal([n_h3, n2], stddev=1. / tf.sqrt(float(n_h3))))
b4 = tf.Variable(tf.zeros([n2]))
return w1, b1, w2, b2, w3, b3, w4, b4
# 前向传播
def fe_forw(x, w1, b1, w2, b2, w3, b3, w4, b4):
z1 = tf.matmul(x, w1) + b1
a1 = tf.nn.relu(z1)
z2 = tf.matmul(a1, w2) + b2
a2 = tf.nn.relu(z2)
z3 = tf.matmul(a2, w3) + b3
a3 = tf.nn.relu(z3)
z4 = tf.matmul(a3, w4) + b4
a4 = tf.nn.softmax(z4)
return a4
# 定义训练任务内循环计算
def meta_inner_train(w1, b1, w2, b2, w3, b3, w4, b4, x_s, y_s, x_q, y_q):
epos = 0
max_epos = 5
# lr = 0.05
while True:
with tf.GradientTape() as tape:
a4_s = fe_forw(x_s, w1, b1, w2, b2, w3, b3, w4, b4)
loss = tf.reduce_mean(tf.square(y_s - a4_s)) # 求平均值,这里是20个样本的损失平均值
optimizer = tf.keras.optimizers.Adam(lr=0.01)
grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3, w4, b4])
optimizer.apply_gradients(zip(grads, [w1, b1, w2, b2, w3, b3, w4, b4]))
# grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3, w4, b4])
# w1.assign_sub(lr * grads[0])
# b1.assign_sub(lr * grads[1])
# w2.assign_sub(lr * grads[2])
# b2.assign_sub(lr * grads[3])
# w3.assign_sub(lr * grads[4])
# b3.assign_sub(lr * grads[5])
# w4.assign_sub(lr * grads[6])
# b4.assign_sub(lr * grads[7])
la = tf.argmax(a4_s, axis=1)
y_tr_1 = tf.argmax(y_s, axis=1)
cor_tr1 = tf.cast(tf.equal(la, y_tr_1), dtype=tf.int32)
cor_tr1 = tf.reduce_sum(cor_tr1) # 支持集样本对了几个
num1 = x_s.shape[0] # 支持集样本数量(这里我取的是20个)
acc_s = cor_tr1 / num1 # 支持集样本判断正确率
if epos % 1 == 0:
print('epos = {}, loss = {:.4f}, acc_s = {:.4f}, 对了 = {}, 支持集总数 = {}'.format(epos, loss, acc_s, cor_tr1, num1))
if loss < 0.001:
break
if acc_s == 1.0:
break
if epos == max_epos:
break
else:
epos += 1
w1_q, b1_q, w2_q, b2_q, w3_q, b3_q, w4_q, b4_q = w1, b1, w2, b2, w3, b3, w4, b4 # 查询集的初始参数=训练好的支持集参数
with tf.GradientTape() as tape:
a4_q = fe_forw(x_q, w1_q, b1_q, w2_q, b2_q, w3_q, b3_q, w4_q, b4_q)
losq = tf.reduce_mean(tf.square(y_q - a4_q)) # 查询集60个样本的损失平均值
gradq = tape.gradient(losq, [w1_q, b1_q, w2_q, b2_q, w3_q, b3_q, w4_q, b4_q])
# w1.assign_sub(lr* gradq[0]) # 这里应该更新元模型的参数
# b1.assign_sub(lr * gradq[1])
# w2.assign_sub(lr * gradq[2])
# b2.assign_sub(lr* gradq[3])
# w3.assign_sub(lr * gradq[4])
# b3.assign_sub(lr * gradq[5])
# w4.assign_sub(lr * gradq[6])
# b4.assign_sub(lr * gradq[7])
return gradq[0], gradq[1], gradq[2], gradq[3], gradq[4], gradq[5], gradq[6], gradq[7], losq # 记录查询集的梯度,用来更新元模型参数
# 定义训练任务外循环计算
def meta_outer_train(w1, b1, w2, b2, w3, b3, w4, b4, x_s, y_s, x_q, y_q):
grw1, grb1, grw2, grb2, grw3, grb3, grw4, grb4 = 0, 0, 0, 0, 0, 0, 0, 0 # 收集训练任务查询集的梯度用这个梯度来更新元模型参数
lr_y = 0.005
losq = 0
epo_s = 0
max_epos = 6
"""迭代次数,伪代码3-10一个循环,这里是元模型更新参数的次数,因为这里是100个任务全部投入训练,
没有分batch,所以元模型正常只会更新一次,这里设置的是最大更新45次"""
while True:
for i in range(100): # i:训练任务数,要计算每个任务的loss
print('在训练第{}个任务'.format(i))
grw1_t, grb1_t, grw2_t, grb2_t, grw3_t, grb3_t, grw4_t, grb4_t, losq_t = meta_inner_train(w1, b1, w2, b2, w3, b3, w4,
b4, x_s[i], y_s[i], x_q[i], y_q[i])
# grw1 = grw1 + grw1_t, 聚合这一百个任务的梯度,这里面没有分batch,相当于一个batch100个任务
grw1 += grw1_t
grb1 += grb1_t
grw2 += grw2_t
grb2 += grb2_t
grw3 += grw3_t
grb3 += grb3_t
grw4 += grw4_t
grb4 += grb4_t
losq += losq_t
"""用一百个任务的梯度更新元模型梯度"""
w1.assign_sub(lr_y * grw1)
b1.assign_sub(lr_y * grb1)
w2.assign_sub(lr_y * grw2)
b2.assign_sub(lr_y * grb2)
w3.assign_sub(lr_y * grw3)
b3.assign_sub(lr_y * grb3)
w4.assign_sub(lr_y * grw4)
b4.assign_sub(lr_y * grb4)
if epo_s % 1 == 0:
print('*' * 150)
print('*' * 150)
print('epo_s = {}, losq = {:.4f}'.format(epo_s, losq))
print('*' * 150)
print('*' * 150)
# if losq < 40:
# break
if epo_s == max_epos:
break
else:
epo_s += 1
return w1, b1, w2, b2, w3, b3, w4, b4 # 训练好的元模型的参数
# 定义元微调计算
def meta_fine_tune(w1, b1, w2, b2, w3, b3, w4, b4, x_s, y_s, x_q, y_q):
lr = 0.005
epos = 0
max_epos = 100 # 元学习测试任务的支持集可以迭代很多次
while True:
with tf.GradientTape() as tape:
a4_s = fe_forw(x_s, w1, b1, w2, b2, w3, b3, w4, b4)
loss = tf.reduce_mean(tf.square(y_s - a4_s))
optimizer = tf.keras.optimizers.Adam(lr=0.1)
grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3, w4, b4])
optimizer.apply_gradients(zip(grads, [w1, b1, w2, b2, w3, b3, w4, b4]))
# w1.assign_sub(lr * grads[0])
# b1.assign_sub(lr * grads[1])
# w2.assign_sub(lr * grads[2])
# b2.assign_sub(lr * grads[3])
# w3.assign_sub(lr * grads[4])
# b3.assign_sub(lr * grads[5])
# w4.assign_sub(lr * grads[6])
# b4.assign_sub(lr * grads[7])
la = tf.argmax(a4_s, axis=1)
y_tr_1 = tf.argmax(y_s, axis=1)
cor_tr = tf.cast(tf.equal(la, y_tr_1), dtype=tf.int32)
cor_tr = tf.reduce_sum(cor_tr)
num = x_s.shape[0]
acc_s = cor_tr / num # 测试阶段支持集精度
if epos % 1 == 0:
print('epos = {}, 微调loss = {:.4f}, 微调acc_s = {:.4f}, 对了 = {}, 样本数 = {}'.format(epos, loss, acc_s, cor_tr, num))
if loss < 0.001:
# print('loss = {:.4f}, acc_s = {:.4f}'.format(loss, acc_s))
break
if acc_s == 1.0:
print(acc_s)
break
if epos == max_epos:
break
else:
epos += 1
# 真正的测试,用测试任务的查询集测试
a4_q = fe_forw(x_q, w1, b1, w2, b2, w3, b3, w4, b4)
laq = tf.argmax(a4_q, axis=1)
y_te_1 = tf.argmax(y_q, axis=1)
cor_te = tf.cast(tf.equal(laq, y_te_1), dtype=tf.int32)
cor_te = tf.reduce_sum(cor_te)
numq = x_q.shape[0]
acc_q = cor_te / numq # 测试阶段查询集的精度
return acc_q
# 定义测试任务中查询集计算(真正的测试)
def online_test(w1, b1, w2, b2, w3, b3, w4, b4, n_test):
acc_all = []
# 抽50个测试任务,每个任务都有支持集和查询集
for i in range(n_test):
print('在测试第{}个任务'.format(i))
t_d_s0, t_l_s0, t_d_q0, t_l_q0 = qushu(data7, lab7)
t_d_s1, t_l_s1, t_d_q1, t_l_q1 = qushu(data8, lab8)
t_d_s2, t_l_s2, t_d_q2, t_l_q2 = qushu(data9, lab9)
t_d_s3, t_l_s3, t_d_q3, t_l_q3 = qushu(data10, lab10)
x_s = np.vstack((t_d_s0, t_d_s1, t_d_s2, t_d_s3))
y_s = np.vstack((t_l_s0, t_l_s1, t_l_s2, t_l_s3))
x_q = np.vstack((t_d_q0, t_d_q1, t_d_q2, t_d_q3))
y_q = np.vstack((t_l_q0, t_l_q1, t_l_q2, t_l_q3))
acc_q = meta_fine_tune(w1, b1, w2, b2, w3, b3, w4, b4, x_s, y_s, x_q, y_q)
acc_all.append(acc_q) # 10个测试任务的查询集的10个精度
print('i = {}, acc_q = {:.4f}'.format(i, acc_q))
acc = tf.reduce_mean(acc_all) # 50个测试任务的平均精度,防止极端情况
return acc, acc_all
n_train = 100 # 抽取多少个训练任务
n_test = 10 # 抽取多少个测试任务,一般测试任务抽一个就行,但是在实际操作中,一般都抽取多个任务,将查询集精度平均,防止极端情况
# 初始化元模型参数
w1_0, b1_0, w2_0, b2_0, w3_0, b3_0, w4_0, b4_0 = init_fen(200, 600, 200, 50, 6)
# 元学习训练阶段训练出新的元模型参数
w1_1, b1_1, w2_1, b2_1, w3_1, b3_1, w4_1, b4_1 = meta_outer_train(w1_0, b1_0, w2_0, b2_0, w3_0, b3_0, w4_0, b4_0, x_s, y_s, x_q, y_q)
# 元学习测试新任务
acc, acc_all = online_test(w1_1, b1_1, w2_1, b2_1, w3_1, b3_1, w4_1, b4_1, n_test)
print('*' * 50)
print('acc = ', acc)
total_time = time.time() - now_time
print("total_time", total_time)
print(acc_all)
# 绘制 Accuracy 曲线
plt.figure()
plt.title('Acc Curve') # 图片标题,curve:曲线
plt.xlabel('i') # x轴变量名称
plt.ylabel('Acc') # y轴变量名称
plt.plot(acc_all, label="$Accuracy$") # 逐点画出test_acc值并连线,连线图标是Accuracy
plt.legend()
plt.show()