我写了一个GRU的时序模型预测的代码,当我通过30天的7个特征数据,预测第31天的7个维度数据。模型训练和预测的效果还可以。
但是当我开始利用滚动预测,也就是通过预测出来的31天的数据,填充进去再把第一天的数据剔除并预测,效果就开始差了。。
。具体的代码如下:不知道哪里有问题。
import torch
import torch.nn as nn
import numpy as np
import yfinance as yf
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
# 下载并预处理数据
def download_preprocess_data(ticker, start_date, end_date):
data = yf.download(ticker, start=start_date, end=end_date)
data = data[['Open', 'Close', 'Volume']]
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA10'] = data['Close'].rolling(window=10).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['EMA'] = data['Close'].ewm(span=20, adjust=False).mean()
data.dropna(inplace=True)
return data
def normalize_data(data):
scaler = MinMaxScaler(feature_range=(0, 1))
return scaler.fit_transform(data), scaler
def create_sequences(data, look_back):
X, y = [], []
max_index = len(data) - look_back
for i in range(max_index):
X.append(data[i:(i + look_back)]) # 特征窗口:从第i天到第i+look_back-1天
y_target = data[i + look_back] # 预测下一个时间步的所有7个特征
y.append(y_target)
return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)
data = download_preprocess_data('AAPL', '2014-01-01', datetime.now().strftime('%Y-%m-%d'))
# x_data, y_data = create_sequences(data.values, look_back = 30, steps_ahead=30)
scaled_data, scaler = normalize_data(data.values)
look_back = 30
x_data, y_data = create_sequences(scaled_data, look_back)
# Split data into train and test sets
train_size = int(len(x_data) * 0.8)
x_train, x_test = x_data[:train_size], x_data[train_size:]
y_train, y_test = y_data[:train_size], y_data[train_size:]
print("x_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)
print("Data types:", x_train.dtype, y_train.dtype)
# Convert to PyTorch tensors
train_dataset = TensorDataset(torch.from_numpy(x_train).float(), torch.from_numpy(y_train).float())
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=False)
# Define the GRU model
class GRUModel(nn.Module):
def __init__(self, input_size, hidden_size1=256, hidden_size2=128, output_size=7):
super(GRUModel, self).__init__()
self.gru1 = nn.GRU(input_size, hidden_size1, num_layers=1, batch_first=True)
self.gru2 = nn.GRU(hidden_size1, hidden_size2, num_layers=1, batch_first=True)
self.dropout = nn.Dropout(0.2)
self.dense = nn.Linear(hidden_size2, output_size)
def forward(self, x):
gru_out1, _ = self.gru1(x)
gru_out1 = self.dropout(gru_out1)
gru_out2, _ = self.gru2(gru_out1)
gru_out2 = gru_out2[:, -1, :] # 只取最后一个时间步的输出
predictions = self.dense(gru_out2)
return predictions
model = GRUModel(input_size=x_train.shape[2], hidden_size1=256, hidden_size2=128, output_size=7)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
# Train the model
epochs = 1
for epoch in range(epochs):
for inputs, targets in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets.unsqueeze(1))
loss.backward()
optimizer.step()
print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item()}')
model.eval()
# 预测x_test的全部数据
with torch.no_grad():
full_test_predictions = model(torch.from_numpy(x_test).float()) # 预测所有测试数据
# 准备滚动预测的初始输入:使用x_test的最后一个样本
last_inputs = x_test[-1] # 这应该是一个[30, 7]的数组
# 使用模型进行初始预测
with torch.no_grad():
last_inputs_tensor = torch.from_numpy(last_inputs).float().unsqueeze(0) # 添加批次维度
initial_prediction = model(last_inputs_tensor).numpy()
# 更新第30天的数据
current_input = np.vstack([last_inputs[1:], initial_prediction]) # 移除第一天,添加新预测的天
predicted_features = [initial_prediction.flatten()] # 保存初始预测结果
# 滚动预测30次
for _ in range(30):
with torch.no_grad():
# 对当前输入进行预测
current_input_tensor = torch.from_numpy(current_input[np.newaxis, :, :]).float()
prediction = model(current_input_tensor).numpy()
# 更新预测结果列表
predicted_features.append(prediction.flatten())
# 更新输入数据:移除第一个时间步,添加新的预测结果
current_input = np.vstack([current_input[1:], prediction])
# 将预测结果转换为numpy数组
predicted_features = np.array(predicted_features)
print(predicted_features)
# 反归一化处理
full_predictions = np.zeros((predicted_features.shape[0], scaled_data.shape[1]))
full_predictions[:, 1] = predicted_features[:, 1] # 取出Close价格特征
predictions_inv = scaler.inverse_transform(full_predictions)[:, 1]
# 准备绘图数据
start_date = data.index[train_size + look_back]
valid_dates = pd.date_range(start=start_date, periods=len(predicted_features), freq='D')
# 创建DataFrame
predictions_df = pd.DataFrame({
'Predictions': predictions_inv,
'Actual': data['Close'][train_size + look_back:train_size + look_back + len(predicted_features)].values
}, index=valid_dates)
# Visualization
plt.figure(figsize=(16, 8))
plt.title('Future Close Price Predictions Including Rolling Predictions')
plt.xlabel('Date')
plt.ylabel('Close Price USD ($)')
# 绘制预测结果和实际数据
plt.plot(predictions_df['Predictions'], label='Predicted Future Close', color='red', linestyle='--')
plt.plot(predictions_df['Actual'], label='Actual Close Price', color='blue', alpha=0.5)
plt.legend()
plt.show()