最近想跑一个简单的LSTM的模型,但是预测结果总是很差,不知道问题处在了哪里?
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
# 获取股票数据
df = yf.download('AAPL', start='2012-01-01', end=datetime.now().strftime('%Y-%m-%d'))
# 选择特征:Open, High, Low, Close, Volume
data = df[['Open', 'High', 'Low', 'Close', 'Volume']]
# 计算移动平均线(如5日、10日、20日)
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA10'] = data['Close'].rolling(window=10).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
# 计算指数平滑移动平均线(EMA)
data['EMA'] = data['Close'].ewm(span=20, adjust=False).mean()
# 计算相对强弱指数(RSI)
delta = data['Close'].diff(1)
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=14).mean()
avg_loss = loss.rolling(window=14).mean()
rs = avg_gain / avg_loss
data['RSI'] = 100 - (100 / (1 + rs))
# 移除缺失值
data.dropna(inplace=True)
# 数据归一化
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)
# 定义窗口大小
look_back = 60
# 创建特征和目标
x_data = []
y_data = []
for i in range(look_back, len(scaled_data)):
x_data.append(scaled_data[i-look_back:i])
y_data.append(scaled_data[i, 3]) # 目标是收盘价
# 转换为numpy数组
x_data, y_data = np.array(x_data), np.array(y_data)
# 划分训练和测试数据
train_size = int(len(x_data) * 0.8)
x_train, x_test = x_data[:train_size], x_data[train_size:]
y_train, y_test = y_data[:train_size], y_data[train_size:]
# 重塑数据为LSTM的输入形状
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], x_train.shape[2]))
x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], x_test.shape[2]))
import torch
from torch.utils.data import DataLoader, Dataset
class StockDataset(Dataset):
def __init__(self, features, targets):
self.features = features
self.targets = targets
def __len__(self):
return len(self.features)
def __getitem__(self, index):
return self.features[index], self.targets[index]
train_dataset = StockDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_dataset = StockDataset(x_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
import torch.nn as nn
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size1=128, hidden_size2=64, output_size=1):
super(LSTMModel, self).__init__()
self.lstm1 = nn.LSTM(input_size, hidden_size1, num_layers=1, batch_first=True)
self.lstm2 = nn.LSTM(hidden_size1, hidden_size2, num_layers=1, batch_first=True)
self.dense = nn.Linear(hidden_size2, output_size)
def forward(self, x):
lstm_out1, _ = self.lstm1(x)
lstm_out2, _ = self.lstm2(lstm_out1)
lstm_out2 = lstm_out2[:, -1, :] # 只取最后一个时间步的输出
predictions = self.dense(lstm_out2)
return predictions
input_size = x_train.shape[2]
model = LSTMModel(input_size=input_size)
import torch.optim as optim
def train_model(model, train_loader, num_epochs=100, lr=0.001):
loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
for epoch in range(num_epochs):
model.train()
for inputs, targets in train_loader:
inputs, targets = inputs.float(), targets.float()
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_function(outputs, targets)
loss.backward()
optimizer.step()
print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')
train_model(model, train_loader)
model.eval()
predictions = []
with torch.no_grad():
for inputs, _ in test_loader:
inputs = inputs.float()
outputs = model(inputs)
predictions.append(outputs.numpy())
# 连接预测结果
predictions = np.concatenate(predictions)
# 反归一化
predictions = scaler.inverse_transform(np.concatenate((np.zeros((predictions.shape[0], scaled_data.shape[1] - 1)), predictions.reshape(-1, 1)), axis=1))[:, -1]
# 可视化
train = data.iloc[:train_size + look_back]
valid = data.iloc[train_size + look_back:]
valid['Predictions'] = predictions
plt.figure(figsize=(16,8))
plt.title('Model')
plt.xlabel('Date')
plt.ylabel('Close Price USD ($)')
plt.plot(train['Close'])
plt.plot(valid[['Close', 'Predictions']])
plt.legend(['Train', 'Val', 'Predictions'], loc='lower right')
plt.show()
结果很差: