#我不知道GPT拼接的算法的正确性,有没有大佬帮我指出算法的问题,最好能提供丢丢修改意见~
#
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
import numpy as np
from scipy.integrate import quad
import torch
import seaborn as sns
import torch.nn as nn
import torch.optim as optim
import random
from collections import namedtuple, deque
import matplotlib.pyplot as plt
import torch.nn.functional as F
from scipy.stats import norm
import scipy.stats as stats
from scipy.integrate import quad
from scipy.stats import norm
# 设定超参数
EPISODES = 500
MAX_STEPS = 365
BUFFER_SIZE = 10000
BATCH_SIZE = 64
GAMMA = 0.99
LR = 0.001
TAU = 1e-3
UPDATE_EVERY = 4
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# DQN模型
class DQN(nn.Module):
def __init__(self, input_dim, output_dim):
super(DQN, self).__init__()
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, output_dim)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x)
# LSTM
class LSTMModel(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
super(LSTMModel, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[:, -1, :])
return out
# Agent
class Agent:
def __init__(self, state_size, action_size, seed):
self.state_size = state_size
self.action_size = action_size
self.seed = random.seed(seed)
self.qnetwork_local = DQN(state_size, action_size).to(device)
self.qnetwork_target = DQN(state_size, action_size).to(device)
self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=LR)
self.memory = deque(maxlen=BUFFER_SIZE)
self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
self.t_step = 0
self.action_history = []
self.loss_history = []
def step(self, state, action, reward, next_state, done):
self.memory.append(self.experience(state, action, reward, next_state, done))
self.t_step = (self.t_step + 1) % UPDATE_EVERY
if self.t_step == 0:
if len(self.memory) > BATCH_SIZE:
experiences = self.sample_experiences()
self.learn(experiences, GAMMA)
def act(self, state, eps=0.):
state = torch.from_numpy(state).float().unsqueeze(0).to(device)
self.qnetwork_local.eval()
with torch.no_grad():
action_values = self.qnetwork_local(state)
self.qnetwork_local.train()
if random.random() > eps:
chosen_action = np.argmax(action_values.cpu().data.numpy())
else:
chosen_action = random.choice(np.arange(self.action_size))
self.action_history.append(chosen_action)
return chosen_action
def learn(self, experiences, gamma):
states, actions, rewards, next_states, dones = experiences
Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))
Q_expected = self.qnetwork_local(states).gather(1, actions)
loss = nn.MSELoss()(Q_expected, Q_targets)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.loss_history.append(loss.item())
self.soft_update(self.qnetwork_local, self.qnetwork_target, TAU)
def soft_update(self, local_model, target_model, tau):
for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)
def sample_experiences(self):
experiences = random.sample(self.memory, k=BATCH_SIZE)
states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(
device)
dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(
device)
return (states, actions, rewards, next_states, dones)
# 环境
class DataProductEnvironment:
def __init__(self):
self.day = 0
self.action_space = 10000
self.state_space = 1
self.raw_data = [] # 新增,用于存储原始数据D_t
self.price_history = [] # 新增,用于存储历史价格数据
self.quality_history = [] # 新增,用于存储历史质量数据
self.N_history = []
self.theta_history = []
self.mu_history = []
self.nt_w_history = []
self.nt_prime_history = []
self.nt_history = []
self.price_hat_history = []
self.quality_hat_history = []
self.lstm_price_model = LSTMModel(3, 32, 1, 1).to(device)
self.lstm_quality_model = LSTMModel(3, 32, 1, 1).to(device)
self.lstm_optimizer_price = optim.Adam(self.lstm_price_model.parameters(), lr=0.001)
self.lstm_optimizer_quality = optim.Adam(self.lstm_quality_model.parameters(), lr=0.001)
self.reward_history = []
'''需要对哪些重置'''
def reset(self):
self.day = 0
self.raw_data = [] # 在环境重置时清空原始数据
self.price_history = [] # 在环境重置时清空历史价格数据
self.quality_history = [] # 在环境重置时清空历史质量数据
state = np.array([self.day])
return state
'''这里的m_t_values可能有问题,需要改成m_t'''
def calculate_quality(self, m_t_values):
T = 1
gamma = 0.7
qualities = []
for i, m_t in enumerate(m_t_values):
# D_t = self.raw_data[i]
quality = 0
for j in range(max(0, i - 2), i + 1):
quality += self.raw_data[j] / T * (1 - np.exp(-m_t)) * np.exp(-gamma * T * (i - j))
qualities.append(quality)
return sum(qualities)
# 计算LSTM预测的价格和质量
def predict_price_quality(self):
if len(self.price_history) < 3 or len(self.quality_history) < 3:
price_prediction, quality_prediction = 0, 0
else:
price_input = torch.tensor(self.price_history[-3:]).view(1, -1, 1).float().to(device)
quality_input = torch.tensor(self.quality_history[-3:]).view(1, -1, 1).float().to(device)
self.lstm_price_model.eval()
self.lstm_quality_model.eval()
with torch.no_grad():
price_prediction = self.lstm_price_model(price_input)
quality_prediction = self.lstm_quality_model(quality_input)
self.lstm_price_model.train()
self.lstm_quality_model.train()
return price_prediction.item(), quality_prediction.item()
'''
这个部分需要检查对于nt、nt_w、nt_prime的计算是否正确
'''
# 对各个部分的消费者比例进行计算
def theta_t(self):
epsilon_1 = 0.5
epsilon_2 = random.uniform(0, 0.15)
if len(self.quality_history) == 0:
mu_t = 0.5
else:
mu_t = epsilon_1 * self.quality_history[-1] + epsilon_2
sigma_2 = 0.1
theta_t = np.random.normal(mu_t, sigma_2)
return theta_t, mu_t, sigma_2
def f(self, theta_t, mu_t, sigma_2):
# 正态分布的概率密度函数
return norm.pdf(theta_t, loc=mu_t, scale=sigma_2)
def integrand1(self, theta_t, price_t, price_hat_t, quality_t, quality_hat_t, mu_t, sigma_2):
return self.f(theta_t, mu_t, sigma_2)
def calculate_nt(self):
theta_t, mu_t, sigma_2 = self.theta_t()
price_t = self.price_history[-1]
price_hat_t = self.price_hat_history[-1]
quality_t = self.quality_history[-1]
quality_hat_t = self.quality_hat_history[-1]
lower_limit = (price_t - price_hat_t) / (quality_t - quality_hat_t)
upper_limit = 1
integral1, _ = quad(self.integrand1, lower_limit, upper_limit,
args=(theta_t, price_t, price_hat_t, quality_t, quality_hat_t, mu_t, sigma_2))
return integral1
def calculate_nt_w(self):
theta_t, mu_t, sigma_2 = self.theta_t()
price_t = self.price_history[-1]
price_hat_t = self.price_hat_history[-1]
quality_t = self.quality_history[-1]
quality_hat_t = self.quality_hat_history[-1]
lower_limit = price_hat_t / quality_hat_t
upper_limit = (price_t - price_hat_t) / (quality_t - quality_hat_t)
integral2, _ = quad(self.integrand1, lower_limit, upper_limit,
args=(theta_t, price_t, price_hat_t, quality_t, quality_hat_t, mu_t, sigma_2))
return integral2
def theta_old(self):
theta_old_t = self.theta_history[-2] if len(self.theta_history) >= 2 else 0
mu_old_t = self.mu_history[-2] if len(self.mu_history) >= 2 else 0
sigma_3 = 0.1
return theta_old_t, mu_old_t, sigma_3
def fo(self, theta_old_t, mu_old_t, sigma_3):
return norm.pdf(theta_old_t, loc=mu_old_t, scale=sigma_3)
def integrand2(self, theta_old_t, price_t, price_old_t, quality_t, quality_old_t, mu_old_t, sigma_3):
return self.fo(theta_old_t, mu_old_t, sigma_3)
def calculate_nt_prime(self):
theta_old_t, mu_old_t, sigma_3 = self.theta_old()
price_t = self.price_history[-1]
price_old_t = self.price_history[-2] if len(self.price_history) >= 2 else 0
quality_t = self.quality_history[-1]
quality_old_t = self.quality_history[-2] if len(self.quality_history) >= 2 else 0
lower_limit = price_t / quality_t
upper_limit = (price_old_t - price_t) / (quality_old_t - quality_t)
integral3, _ = quad(self.integrand2, lower_limit, upper_limit,
args=(theta_old_t, price_t, price_old_t, quality_t, quality_old_t, mu_old_t, sigma_3))
return integral3
# 计算历史数据和期望收益的方法
def generate_N_t(self, price_t, quality_t):
alpha = 100
beta = 130
lambda_t = alpha * np.log(quality_t) + beta * np.exp(-price_t)
N_t = np.random.poisson(lambda_t)
return N_t
def calculate_expected_reward(self, N_t, nt_w, nt_prime, n_t, price_hat_t, quality_hat_t, price_t, cost_t):
a = self.N_history[-2] if len(self.N_history) >= 2 else 0
b = self.nt_w_history[-2] if len(self.nt_w_history) >= 2 else 0
c = self.nt_prime_history[-2] if len(self.nt_prime_history) >= 2 else 0
E_reward_t = (
(a * b) * c + n_t * N_t
) * price_t + (nt_w * N_t * price_hat_t) - cost_t
# return E_reward_t
reward_t = E_reward_t
self.day += 1
done = (self.day >= MAX_STEPS)
state = np.array([self.day])
return state, reward_t, done # 这里返回感觉应该是reward,但是我写的是期望的E_reward_t
def decode_action(self, action):
m_t = action // 100
price = action % 100
m_t_normalized = m_t / 99
price_normalized = price / 99
return m_t_normalized, price_normalized
def step(self, action):
m_t, price_t = self.decode_action(action) # 现在的m_t和quality保持了相同的阈值,需要重新设置
self.price_history.append(price_t) # 存储历史价格数据
if len(self.price_history) > 3:
self.price_history.pop(0) # 只保留最近3个周期的价格数据
d = 10
mu_1 = 1
sigma_1 = 0.5
phi = np.random.normal(mu_1, sigma_1)
D_t = d + phi
self.raw_data.append(D_t) # 存储原始数据D_t
if len(self.raw_data) > 3:
self.raw_data.pop(0) # 只保留最近3个周期的原始数据
# 计算质量
quality_t = self.calculate_quality(self.raw_data[-3:], m_t)
self.quality_history.append(quality_t) # 存储历史质量数据
c = 3
C_t = c * (sum(self.raw_data) - (self.raw_data[-3] if len(self.raw_data) == 3 else 0))
w = 4
Z_t = w * m_t * D_t
cost_t = C_t + Z_t # 更新成本
# 客户到达
# 计算 N_t
N_t = self.generate_N_t(price_t, quality_t)
self.N_history.append(N_t)
# 期望收益的计算
# 计算 theta_t, n_t^w, n_t', n_t, price_hat_t, quality_hat_t
theta_t, mu_t, sigma_2 = self.theta_t()
nt_w = self.calculate_nt_w()
nt_prime = self.calculate_nt_prime()
n_t = self.calculate_nt()
# 更新历史记录
self.theta_history.append(theta_t)
self.mu_history.append(mu_t)
self.nt_w_history.append(nt_w)
self.nt_prime_history.append(nt_prime)
self.nt_history.append(n_t)
# LSTM预测模块
price_hat_t, quality_hat_t = self.predict_price_quality()
self.price_hat_history.append(price_hat_t)
self.quality_hat_history.append(quality_hat_t)
# 计算期望收益
reward_t = self.calculate_expected_reward(
N_t, nt_w, nt_prime, n_t, price_hat_t, quality_hat_t, price_t, cost_t
)
self.reward_history.append(reward_t)
reward = reward_t
return reward
# 主函数
def main():
env = DataProductEnvironment() # Agent会有action,历史状态集合
agent = Agent(state_size=env.state_space, action_size=env.action_space, seed=0)
scores = []
for episode in range(EPISODES):
state = env.reset()
score = 0
for step in range(MAX_STEPS):
action = agent.act(state)
next_state, reward, done = env.step(action)
agent.step(state, action, reward, next_state, done)
state = next_state
score += reward
if done:
break
scores.append(score)
print(f"Episode {episode + 1}: {score}")
torch.save(agent.qnetwork_local.state_dict(), 'model.pth')
return agent, scores, env.action_space
if __name__ == "__main__":
agent, scores, action_space = main()