目前程序存在一些错误导致无法运行,还请大佬能帮忙实现这个程序的功能,原始数据28672行,测试集1666行,原始数据5列,测试数据4列,n代表正常,f代表故障,7个特征值公式程序已有,目标是在1s间隔内提取4*7个样本点,提取1*n组,得到类似图片所示得测试集数据~
import numpy as np import random import math from scipy.fftpack import fft, ifft import matplotlib.pyplot as plt from matplotlib.pylab import mpl import re mpl.rcParams['font.sans-serif'] = ['SimHei'] # 显示中文 mpl.rcParams['axes.unicode_minus'] = False # 显示负号 ##傅里叶变换 with open("pm13.txt", 'r') as f: _str = f.read() _str = re.sub('\t', r',', _str) _str = re.sub(' ', r'', _str) _str = re.sub(',\n', r'\n', _str) with open("pm13_data.txt", 'w') as f: f.write(_str) data = np.loadtxt('pm13_data.txt', delimiter=',', skiprows=0) # delimiter表示分隔列,skiprows作用跳过头行 s1x = [y[0] for y in data] # 第一列元素 s1y = [y[1] for y in data] # 第二列元素 s2x = [y[2] for y in data] # 第三列元素 s2y = [y[3] for y in data] # 第四列元素 fs = 1280 # fs表示采样频率 N = 833 # N为采样点数 fStart = fs*10.5 fEnd = fStart+N # n为正常,f为故障,取1.5~2.5s的正常数据,与10.5~结束的故障数据 # 原始数据的第一列和第二列进行傅里叶变换 sx_n = s1x[int(1.5*N): int(2.5*N)]-np.mean(s1x[int(1.5*N): int(2.5*N)]) sy_n = s1y[int(1.5*N): int(2.5*N)]-np.mean(s1y[int(1.5*N): int(2.5*N)]) sx_f = s1x[int(fStart): int(fEnd)]-np.mean(s1x[int(fStart): int(fEnd)]) sy_f = s1y[int(fStart): int(fEnd)]-np.mean(s1y[int(fStart): int(fEnd)]) Sx_n = 2*abs(fft(sx_n))/N Sy_n = 2*abs(fft(sy_n))/N Sx_f = 2*abs(fft(sx_f))/N Sy_f = 2*abs(fft(sy_f))/N data1 = [np.hstack((sx_n, sx_f)), np.hstack((sy_n, sy_f))]# 按水平方向(列顺序)堆叠数组构成一个新的数组 fft_data1 = [np.hstack((Sx_n, Sx_f)), np.hstack((Sy_n, Sy_f))] n = np.linspace(0, N, N) t = n/fs f = fs/N*n nSamp = len(s1x) T = np.linspace(0, nSamp, nSamp)/fs # 绘制完整时域信号 plt.figure(1) plt.subplots_adjust(hspace=0.5) plt.subplot(211) plt.plot(T, s2x) plt.axis([0, 32, -8500, -4800]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-X向') plt.subplot(212) plt.plot(T, s2y) plt.axis([0, 32, -8500, -4500]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-Y向') # 绘制时域图 plt.figure(2) plt.subplots_adjust(hspace=0.5) plt.subplot(221) plt.plot(t[1:N], sx_n[1:N]) plt.axis([0, 1.5, -700, 700]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-X向正常') plt.subplot(222) plt.plot(t[1:N], sy_n[1:N]) plt.axis([0, 1.5, -700, 700]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-Y向正常') plt.subplot(223) plt.plot(t[1:N], sx_f[1:N]) plt.axis([0, 1.5, -2000, 2000]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-X向碰摩') plt.subplot(224) plt.plot(t[1:N], sy_f[1:N]) plt.axis([0, 1.5, -2000, 2000]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-Y向碰摩') # 绘制幅频图 plt.figure(3) plt.subplots_adjust(hspace=0.5) plt.subplot(221) plt.plot(f[1:int(N/2)], Sx_n[1:int(N/2)]) plt.xlabel('频率 / Hz') plt.ylabel('幅值 / mV') plt.title('频谱-X向正常') plt.subplot(222) plt.plot(f[1:int(N/2)], Sy_n[1:int(N/2)]) plt.xlabel('频率 / Hz') plt.ylabel('幅值 / mV') plt.title('频谱-Y向正常') plt.subplot(223) plt.plot(f[1:int(N/2)], Sx_f[1:int(N/2)]) plt.xlabel('频率 / Hz') plt.ylabel('幅值 / mV') plt.title('频谱-X向碰摩') plt.subplot(224) plt.plot(f[1:int(N/2)], Sy_f[1:int(N/2)]) plt.xlabel('频率 / Hz') plt.ylabel('幅值 / mV') plt.title('频谱-Y向碰摩') # 绘制轴心轨迹图 plt.figure(4) plt.subplots_adjust(hspace=0.5) plt.subplot(211) plt.plot(sx_n, sy_n, 'b-') plt.xlabel('X向电压 / mV') plt.ylabel('Y向电压 / mV') plt.title('轴心轨迹-正常') plt.subplot(212) plt.plot(sx_f, sy_f, 'b-') plt.xlabel('X向电压 / mV') plt.ylabel('Y向电压 / mV') plt.title('轴心轨迹-碰摩') #n为正常,f为故障,原始数据的第三列和第四列进行傅里叶变换 sx_n = s2x[int(1.5*N): int(2.5*N)]-np.mean(s2x[int(1.5*N): int(2.5*N)]) sy_n = s2y[int(1.5*N): int(2.5*N)]-np.mean(s2y[int(1.5*N): int(2.5*N)]) sx_f = s2x[int(fStart): int(fEnd)]-np.mean(s2x[int(fStart): int(fEnd)]) sy_f = s2y[int(fStart): int(fEnd)]-np.mean(s2y[int(fStart): int(fEnd)]) Sx_n = 2*abs(fft(sx_n))/N Sx_f = 2*abs(fft(sx_f))/N Sy_n = 2*abs(fft(sy_n))/N Sy_f = 2*abs(fft(sy_f))/N data2 = [np.hstack((sx_n, sx_f)), np.hstack((sy_n, sy_f))] fft_data2 = [np.hstack((Sx_n, Sx_f)), np.hstack((Sy_n, Sy_f))] # 绘制完整时域信号 plt.figure(5) plt.subplots_adjust(hspace=0.5) plt.subplot(211) plt.plot(T, s1x) plt.axis([0, 32, -8500, -4800]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-X向') plt.subplot(212) plt.plot(T, s1y) plt.axis([0, 32, -8500, -4500]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-Y向') # 绘制时域图 plt.figure(6) plt.subplots_adjust(hspace=0.5) plt.subplot(221) plt.plot(t[1:N], sx_n[1:N]) plt.axis([0, 1.5, -700, 700]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-X向正常') plt.subplot(222) plt.plot(t[1:N], sy_n[1:N]) plt.axis([0, 1.5, -700, 700]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-Y向正常') plt.subplot(223) plt.plot(t[1:N], sx_f[1:N]) plt.axis([0, 1.5, -2000, 2000]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-X向碰摩') plt.subplot(224) plt.plot(t[1:N], sy_f[1:N]) plt.axis([0, 1.5, -2000, 2000]) plt.xlabel('时间 / s') plt.ylabel('电压 / mV') plt.title('时域信号-Y向碰摩') # 绘制幅频图 plt.figure(7) plt.subplots_adjust(hspace=0.5) plt.subplot(221) plt.plot(f[1:int(N/2)], Sx_n[1:int(N/2)]) plt.xlabel('频率 / Hz') plt.ylabel('幅值 / mV') plt.title('频谱-X向正常') plt.subplot(222) plt.plot(f[1:int(N/2)], Sy_n[1:int(N/2)]) plt.xlabel('频率 / Hz') plt.ylabel('幅值 / mV') plt.title('频谱-Y向正常') plt.subplot(223) plt.plot(f[1:int(N/2)], Sx_f[1:int(N/2)]) plt.xlabel('频率 / Hz') plt.ylabel('幅值 / mV') plt.title('频谱-X向碰摩') plt.subplot(224) plt.plot(f[1:int(N/2)], Sy_f[1:int(N/2)]) plt.xlabel('频率 / Hz') plt.ylabel('幅值 / mV') plt.title('频谱-Y向碰摩') # 绘制轴心轨迹图 plt.figure(8) plt.subplots_adjust(hspace=0.5) plt.subplot(211) plt.plot(sx_n, sy_n, 'b-') plt.xlabel('X向电压 / mV') plt.ylabel('Y向电压 / mV') plt.title('轴心轨迹-正常') plt.subplot(212) plt.plot(sx_f, sy_f, 'b-') plt.xlabel('X向电压 / mV') plt.ylabel('Y向电压 / mV') plt.title('轴心轨迹-碰摩') data = [[data1[0], data1[1], data2[0], data2[1]]] fft_data = [fft_data1[0], fft_data1[1], fft_data2[0], fft_data2[1]] #生成原始测试数据集 with open("data_test.txt", 'w') as f: for i in range(0, 2*N): for j in range(0, 4): f.write(str(data[j][i])) f.write(',') f.write('\n') #生成fft数据集 with open("fft_data_test.txt", 'w') as f: for i in range(0, 2*N): for j in range(0, 4): f.write(str(fft_data[j][i])) f.write(',') f.write('\n') plt.show() ##数据处理优化 def handleText(): faultText1=open('pm13.txt') faultTextSet1 = [] for line in faultText1.readlines(): currLine = line.strip().split('\t') lineArr = [] for i in range(len(currLine)): lineArr.append(float(currLine[i])) faultTextSet1.append(lineArr) faultTextSet1=np.array(faultTextSet1) return faultTextSet1 def handleTrain(): faultTrain1=open('data_test.txt') faultTrainSet1 = [] for line in faultTrain1.readlines(): currLine = line.strip().split(',') lineArr = [] for i in range(len(currLine)): lineArr.append(float(currLine[i])) faultTrainSet1.append(lineArr) faultTrainSet1=np.array(faultTrainSet1) return faultTrainSet1 def FFTTrain(): fftTrain1=open('fft_data_test.txt') fftTrainSet1 = [] for line in fftTrain1.readlines(): currLine = line.strip().split(',') lineArr = [] for i in range(len(currLine)): lineArr.append(float(currLine[i])) fftTrainSet1.append(lineArr) fftTrainSet1=np.array(fftTrainSet1) return fftTrainSet1 def FFTText(): fftText1=open('fft_data.txt') fftTextSet1 = [] for line in fftText1.readlines(): currLine = line.strip().split(',') lineArr = [] for i in range(len(currLine)): lineArr.append(float(currLine[i])) fftTextSet1.append(lineArr) fftTextSet1=np.array(fftTextSet1) return fftTextSet1 def mathTrain(): filename = '测试数据特征.txt' faultTrainSet=handleTrain() fftTrainSet=FFTTrain() for i in range(4): faultTrainmath = faultTrainSet[:, i] fftTrainSetmath = fftTrainSet[:, i] Sum = 0 Sum2 = 0 Sum4 = 0 Sum_2 = 0 Sum_1 = 0 sump = 0 sump2 = 0 sump3 = 0 fk2 = 0 fk4 = 0 fk = 0 sums = 0 sum1 = 0 sum2 = 0 sum3 = 0 sum4 = 0 for j in range(1666): Sum = Sum + float(faultTrainmath[[j]]) X_Average = Sum / 1666 with open(filename, 'a') as f: f.write(str(X_Average)) f.write('\n') for j in range(1666): Sum2 = Sum2 + float(faultTrainmath[[j]]) ** 2 X_rms = math.sqrt(Sum2 / 1666) #平方根函数 with open(filename, 'a') as f: f.write(str(X_rms)) f.write('\n') for j in range(1666): Sum4 = Sum4 + float(faultTrainmath[[j]]) ** 4 β = Sum4 / 1666 with open(filename, 'a') as f: f.write(str(β)) f.write('\n') for j in range(1666): Sum_2 = Sum_2 + math.sqrt(math.sqrt(faultTrainmath[[j]] ** 2)) X_r = (Sum_2 / 1666) ** 2 with open(filename, 'a') as f: f.write(str(X_r)) f.write('\n') for j in range(1666): Sum_1 = Sum_1 + (float(faultTrainmath[[j]]) - X_Average) ** 2 σ2 = Sum_1 / 1665 with open(filename, 'a') as f: f.write(str(σ2)) f.write('\n') for j in range(833): sump = sump + float(fftTrainSetmath[[j]]) p1 = sump / 833 #点数K=N/2 with open(filename, 'a') as f: f.write(str(p1)) f.write('\n') for j in range(833): sump2 = sump2 + (float(fftTrainSetmath[[j]]) - p1) ** 2 p2 = sump2 / 832 with open(filename, 'a') as f: f.write(str(p2)) f.write('\n') for j in range(833): sump3 = sump3 + (float(fftTrainSetmath[[j]]) - p1) ** 3 p3 = sump3 / (833 * (math.sqrt(p2)) ** 3) with open(filename, 'a') as f: f.write(str(p3)) f.write('\n') for j in range(833): fk4 = fk4 + float(fftTrainSetmath[[j]]) * (j+1) ** 4 fk2 = fk2 + float(fftTrainSetmath[[j]]) * (j+1) ** 2 p4 = math.sqrt(fk4 / (fk2 )) with open(filename, 'a') as f: f.write(str(p4)) f.write('\n') for j in range(833): fk = fk + (j+1) * float(fftTrainSetmath[[j]]) sums = sums + float(fftTrainSetmath[[j]]) p5 = fk / sums with open(filename, 'a') as f: f.write(str(p5)) f.write('\n') for j in range(833): sum1 = sum1 + float(fftTrainSetmath[[j]]) * (j+1 - p5) ** 2 p6 = math.sqrt(sum1 / 833) with open(filename, 'a') as f: f.write(str(p6)) f.write('\n') for j in range(833): sum2 = sum2 + (j+1) * (j+1) * float(fftTrainSetmath[[j]]) sum3 = sum3 + float(fftTrainSetmath[[j]]) sum4 = sum4 + float(fftTrainSetmath[[j]]) * (j+1) ** 4 p7 = sum2 / math.sqrt(sum3 * sum4 ) with open(filename, 'a') as f: f.write(str(p7)) f.write('\n') if i < 2: with open(filename, 'a') as f: f.write('1\n') else: with open(filename, 'a') as f: f.write('0\n') def TrainwriteNp(): b = np.loadtxt('测试数据特征.txt') b=np.array(b).reshape(int(len(b)/13), 13) np.savetxt("b.txt", b, fmt='%f', delimiter=',') def mathText(): faultTextSet=handleText() fftTextSet=FFTText() filename='数据特征.txt' for i in range(4): faultTextmath=faultTextSet[:,i] fftTextSetmath=fftTextSet[:,i] Sum = 0 Sum2 = 0 Sum4 = 0 Sum_2=0 Sum_1=0 sump=0 sump2=0 sump3=0 fk2=0 fk4=0 fk=0 sums=0 sum1=0 sum2=0 sum3=0 sum4=0 for j in range(28672): Sum=Sum+float(faultTextmath[[j]]) X_Average=Sum/28672 with open(filename,'a') as f: f.write(str(X_Average)) f.write('\n') for j in range(28672): Sum2=Sum2+float(faultTextmath[[j]])**2 X_rms=math.sqrt(Sum2/28672) with open(filename, 'a') as f: f.write(str(X_rms)) f.write('\n') for j in range(28672): Sum4=Sum4+float(faultTextmath[[j]])**4 β=Sum4/28672 with open(filename, 'a') as f: f.write(str(β)) f.write('\n') for j in range(28672): Sum_2=Sum_2+math.sqrt(math.sqrt(faultTextmath[[j]]**2)) X_r=(Sum_2/28672)**2 with open(filename, 'a') as f: f.write(str(X_r)) f.write('\n') for j in range(28672): Sum_1=Sum_1+(float(faultTextmath[[j]])-X_Average)**2 σ2=Sum_1/28671 with open(filename, 'a') as f: f.write(str(σ2)) f.write('\n') for j in range(14336): sump =sump + float(fftTextSetmath[[j]]) p1 = sump / 14336 with open(filename, 'a') as f: f.write(str(p1)) f.write('\n') for j in range(28672): sump2 = sump2+(float(fftTextSetmath[[j]])-p1)**2 p2=sump2/14335 with open(filename, 'a') as f: f.write(str(p2)) f.write('\n') for j in range(14336): sump3=sump3+(float(fftTextSetmath[[j]])-p1)**3 p3=sump3/(14336*(math.sqrt(p2))**3) with open(filename, 'a') as f: f.write(str(p3)) f.write('\n') for j in range(14336): fk4=fk4+float(fftTextSetmath[[j]])*((j+1)**4) fk2=fk2+float(fftTextSetmath[[j]])*((j+1)**2) p4=math.sqrt(fk4/(fk2)) with open(filename, 'a') as f: f.write(str(p4)) f.write('\n') for j in range(14336): fk=fk+(j+1)*float(fftTextSetmath[[j]]) sums=sums+float(fftTextSetmath[[j]]) p5=fk/sums with open(filename, 'a') as f: f.write(str(p5)) f.write('\n') for j in range(14336): sum1=sum1+float(fftTextSetmath[[j]])*((j-p5)**2) p6=math.sqrt(sum1/14336) with open(filename, 'a') as f: f.write(str(p6)) f.write('\n') for j in range(14336): sum2=sum2+(j+1)*(j+1)*float(fftTextSetmath[[j]]) sum3=sum3+float(fftTextSetmath[[j]]) sum4=sum4+float(fftTextSetmath[[j]])*((j+1)**4) p7=sum2/math.sqrt(sum3*sum4) with open(filename, 'a') as f: f.write(str(p7)) f.write('\n') if i<2: with open(filename, 'a') as f: f.write('1\n') else: with open(filename, 'a') as f: f.write('0\n') def writeNp(): a = np.loadtxt('数据特征.txt') a = np.array(a).reshape(int(len(a)/13), 13) np.savetxt("a.txt", a, fmt='%f', delimiter=',') if __name__ == '__main__': handleText() mathText() writeNp() handleTrain() mathTrain() TrainwriteNp()