bsl954033929 2023-04-05 13:54 采纳率: 0%
浏览 8

强化学习做数据产品定价的代码逻辑问题

#我不知道GPT拼接的算法的正确性,有没有大佬帮我指出算法的问题,最好能提供丢丢修改意见~
#

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

import numpy as np
from scipy.integrate import quad
import torch
import seaborn as sns
import torch.nn as nn
import torch.optim as optim
import random
from collections import namedtuple, deque
import matplotlib.pyplot as plt
import torch.nn.functional as F
from scipy.stats import norm
import scipy.stats as stats
from scipy.integrate import quad
from scipy.stats import norm


# 设定超参数
EPISODES = 500
MAX_STEPS = 365
BUFFER_SIZE = 10000
BATCH_SIZE = 64
GAMMA = 0.99
LR = 0.001
TAU = 1e-3
UPDATE_EVERY = 4

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# DQN模型
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# LSTM
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


# Agent
class Agent:
    def __init__(self, state_size, action_size, seed):
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        self.qnetwork_local = DQN(state_size, action_size).to(device)
        self.qnetwork_target = DQN(state_size, action_size).to(device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=LR)

        self.memory = deque(maxlen=BUFFER_SIZE)
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

        self.t_step = 0

        self.action_history = []
        self.loss_history = []

    def step(self, state, action, reward, next_state, done):

        self.memory.append(self.experience(state, action, reward, next_state, done))

        self.t_step = (self.t_step + 1) % UPDATE_EVERY
        if self.t_step == 0:
            if len(self.memory) > BATCH_SIZE:
                experiences = self.sample_experiences()
                self.learn(experiences, GAMMA)

    def act(self, state, eps=0.):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state)
        self.qnetwork_local.train()

        if random.random() > eps:
            chosen_action = np.argmax(action_values.cpu().data.numpy())
        else:
            chosen_action = random.choice(np.arange(self.action_size))
        self.action_history.append(chosen_action)
        return chosen_action

    def learn(self, experiences, gamma):
        states, actions, rewards, next_states, dones = experiences

        Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))

        Q_expected = self.qnetwork_local(states).gather(1, actions)

        loss = nn.MSELoss()(Q_expected, Q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.loss_history.append(loss.item())

        self.soft_update(self.qnetwork_local, self.qnetwork_target, TAU)

    def soft_update(self, local_model, target_model, tau):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)

    def sample_experiences(self):
        experiences = random.sample(self.memory, k=BATCH_SIZE)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(
            device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(
            device)

        return (states, actions, rewards, next_states, dones)

# 环境
class DataProductEnvironment:

    def __init__(self):
        self.day = 0
        self.action_space = 10000
        self.state_space = 1
        self.raw_data = []  # 新增,用于存储原始数据D_t
        self.price_history = []  # 新增,用于存储历史价格数据
        self.quality_history = []  # 新增,用于存储历史质量数据
        self.N_history = []
        self.theta_history = []
        self.mu_history = []
        self.nt_w_history = []
        self.nt_prime_history = []
        self.nt_history = []
        self.price_hat_history = []
        self.quality_hat_history = []
        self.lstm_price_model = LSTMModel(3, 32, 1, 1).to(device)
        self.lstm_quality_model = LSTMModel(3, 32, 1, 1).to(device)
        self.lstm_optimizer_price = optim.Adam(self.lstm_price_model.parameters(), lr=0.001)
        self.lstm_optimizer_quality = optim.Adam(self.lstm_quality_model.parameters(), lr=0.001)
        self.reward_history = []

    '''需要对哪些重置'''
    def reset(self):
        self.day = 0
        self.raw_data = []  # 在环境重置时清空原始数据
        self.price_history = []  # 在环境重置时清空历史价格数据
        self.quality_history = []  # 在环境重置时清空历史质量数据

        state = np.array([self.day])
        return state

    '''这里的m_t_values可能有问题,需要改成m_t'''
    def calculate_quality(self, m_t_values):
        T = 1
        gamma = 0.7

        qualities = []

        for i, m_t in enumerate(m_t_values):
            # D_t = self.raw_data[i]
            quality = 0
            for j in range(max(0, i - 2), i + 1):
                quality += self.raw_data[j] / T * (1 - np.exp(-m_t)) * np.exp(-gamma * T * (i - j))
            qualities.append(quality)

        return sum(qualities)


    # 计算LSTM预测的价格和质量
    def predict_price_quality(self):
        if len(self.price_history) < 3 or len(self.quality_history) < 3:
            price_prediction, quality_prediction = 0, 0
        else:
            price_input = torch.tensor(self.price_history[-3:]).view(1, -1, 1).float().to(device)
            quality_input = torch.tensor(self.quality_history[-3:]).view(1, -1, 1).float().to(device)

            self.lstm_price_model.eval()
            self.lstm_quality_model.eval()

            with torch.no_grad():
                price_prediction = self.lstm_price_model(price_input)
                quality_prediction = self.lstm_quality_model(quality_input)

            self.lstm_price_model.train()
            self.lstm_quality_model.train()

        return price_prediction.item(), quality_prediction.item()

    '''
    这个部分需要检查对于nt、nt_w、nt_prime的计算是否正确
    '''

    # 对各个部分的消费者比例进行计算
    def theta_t(self):
        epsilon_1 = 0.5
        epsilon_2 = random.uniform(0, 0.15)
        if len(self.quality_history) == 0:
            mu_t = 0.5
        else:
            mu_t = epsilon_1 * self.quality_history[-1] + epsilon_2
        sigma_2 = 0.1
        theta_t = np.random.normal(mu_t, sigma_2)
        return theta_t, mu_t, sigma_2


    def f(self, theta_t, mu_t, sigma_2):
        # 正态分布的概率密度函数
        return norm.pdf(theta_t, loc=mu_t, scale=sigma_2)

    def integrand1(self, theta_t, price_t, price_hat_t, quality_t, quality_hat_t, mu_t, sigma_2):
        return self.f(theta_t, mu_t, sigma_2)

    def calculate_nt(self):
        theta_t, mu_t, sigma_2 = self.theta_t()
        price_t = self.price_history[-1]
        price_hat_t = self.price_hat_history[-1]
        quality_t = self.quality_history[-1]
        quality_hat_t = self.quality_hat_history[-1]

        lower_limit = (price_t - price_hat_t) / (quality_t - quality_hat_t)
        upper_limit = 1

        integral1, _ = quad(self.integrand1, lower_limit, upper_limit,
                           args=(theta_t, price_t, price_hat_t, quality_t, quality_hat_t, mu_t, sigma_2))
        return integral1

    def calculate_nt_w(self):
        theta_t, mu_t, sigma_2 = self.theta_t()
        price_t = self.price_history[-1]
        price_hat_t = self.price_hat_history[-1]
        quality_t = self.quality_history[-1]
        quality_hat_t = self.quality_hat_history[-1]

        lower_limit = price_hat_t / quality_hat_t
        upper_limit = (price_t - price_hat_t) / (quality_t - quality_hat_t)

        integral2, _ = quad(self.integrand1, lower_limit, upper_limit,
                           args=(theta_t, price_t, price_hat_t, quality_t, quality_hat_t, mu_t, sigma_2))
        return integral2

    def theta_old(self):
        theta_old_t = self.theta_history[-2] if len(self.theta_history) >= 2 else 0
        mu_old_t = self.mu_history[-2] if len(self.mu_history) >= 2 else 0
        sigma_3 = 0.1
        return theta_old_t, mu_old_t, sigma_3

    def fo(self, theta_old_t, mu_old_t, sigma_3):
        return norm.pdf(theta_old_t, loc=mu_old_t, scale=sigma_3)

    def integrand2(self, theta_old_t, price_t, price_old_t, quality_t, quality_old_t, mu_old_t, sigma_3):
        return self.fo(theta_old_t, mu_old_t, sigma_3)

    def calculate_nt_prime(self):
        theta_old_t, mu_old_t, sigma_3 = self.theta_old()
        price_t = self.price_history[-1]
        price_old_t = self.price_history[-2] if len(self.price_history) >= 2 else 0
        quality_t = self.quality_history[-1]
        quality_old_t = self.quality_history[-2] if len(self.quality_history) >= 2 else 0

        lower_limit = price_t / quality_t
        upper_limit = (price_old_t - price_t) / (quality_old_t - quality_t)

        integral3, _ = quad(self.integrand2, lower_limit, upper_limit,
                           args=(theta_old_t, price_t, price_old_t, quality_t, quality_old_t, mu_old_t, sigma_3))
        return integral3

    # 计算历史数据和期望收益的方法
    def generate_N_t(self, price_t, quality_t):
        alpha = 100
        beta = 130
        lambda_t = alpha * np.log(quality_t) + beta * np.exp(-price_t)
        N_t = np.random.poisson(lambda_t)
        return N_t

    def calculate_expected_reward(self, N_t, nt_w, nt_prime, n_t, price_hat_t, quality_hat_t, price_t, cost_t):
        a = self.N_history[-2] if len(self.N_history) >= 2 else 0
        b = self.nt_w_history[-2] if len(self.nt_w_history) >= 2 else 0
        c = self.nt_prime_history[-2] if len(self.nt_prime_history) >= 2 else 0
        E_reward_t = (
                             (a * b) * c + n_t * N_t
                     ) * price_t + (nt_w * N_t * price_hat_t) - cost_t
        # return E_reward_t
        reward_t = E_reward_t

        self.day += 1
        done = (self.day >= MAX_STEPS)
        state = np.array([self.day])

        return state, reward_t, done  # 这里返回感觉应该是reward,但是我写的是期望的E_reward_t

    def decode_action(self, action):
        m_t = action // 100
        price = action % 100
        m_t_normalized = m_t / 99
        price_normalized = price / 99
        return m_t_normalized, price_normalized

    def step(self, action):
        m_t, price_t = self.decode_action(action)  # 现在的m_t和quality保持了相同的阈值,需要重新设置
        self.price_history.append(price_t)  # 存储历史价格数据

        if len(self.price_history) > 3:
            self.price_history.pop(0)  # 只保留最近3个周期的价格数据

        d = 10
        mu_1 = 1
        sigma_1 = 0.5
        phi = np.random.normal(mu_1, sigma_1)
        D_t = d + phi
        self.raw_data.append(D_t)  # 存储原始数据D_t

        if len(self.raw_data) > 3:
            self.raw_data.pop(0)  # 只保留最近3个周期的原始数据


    # 计算质量
        quality_t = self.calculate_quality(self.raw_data[-3:], m_t)
        self.quality_history.append(quality_t)  # 存储历史质量数据

        c = 3
        C_t = c * (sum(self.raw_data) - (self.raw_data[-3] if len(self.raw_data) == 3 else 0))

        w = 4
        Z_t = w * m_t * D_t

        cost_t = C_t + Z_t  # 更新成本

    # 客户到达
        # 计算 N_t
        N_t = self.generate_N_t(price_t, quality_t)
        self.N_history.append(N_t)


    # 期望收益的计算
        # 计算 theta_t, n_t^w, n_t', n_t, price_hat_t, quality_hat_t
        theta_t, mu_t, sigma_2 = self.theta_t()
        nt_w = self.calculate_nt_w()
        nt_prime = self.calculate_nt_prime()
        n_t = self.calculate_nt()


        # 更新历史记录
        self.theta_history.append(theta_t)
        self.mu_history.append(mu_t)
        self.nt_w_history.append(nt_w)
        self.nt_prime_history.append(nt_prime)
        self.nt_history.append(n_t)



        # LSTM预测模块
        price_hat_t, quality_hat_t = self.predict_price_quality()
        self.price_hat_history.append(price_hat_t)
        self.quality_hat_history.append(quality_hat_t)

        # 计算期望收益
        reward_t = self.calculate_expected_reward(
            N_t, nt_w, nt_prime, n_t, price_hat_t, quality_hat_t, price_t, cost_t
        )
        self.reward_history.append(reward_t)

        reward = reward_t
        return reward


# 主函数
def main():
    env = DataProductEnvironment()  # Agent会有action,历史状态集合
    agent = Agent(state_size=env.state_space, action_size=env.action_space, seed=0)
    scores = []

    for episode in range(EPISODES):
        state = env.reset()
        score = 0
        for step in range(MAX_STEPS):
            action = agent.act(state)
            next_state, reward, done = env.step(action)
            agent.step(state, action, reward, next_state, done)
            state = next_state
            score += reward
            if done:
                break
        scores.append(score)
        print(f"Episode {episode + 1}: {score}")

    torch.save(agent.qnetwork_local.state_dict(), 'model.pth')

    return agent, scores, env.action_space


if __name__ == "__main__":
    agent, scores, action_space = main()

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-04-06 18:52
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:
    • 你可以参考下这个问题的回答, 看看是否对你有帮助, 链接: https://ask.csdn.net/questions/248789
    • 这篇博客也不错, 你可以看下深度学习环境配置中软件之间逻辑与关系
    • 除此之外, 这篇博客: 如何管理和组织一个机器学习项目中的 代码编写标准 部分也许能够解决你的问题, 你可以仔细阅读以下内容或者直接跳转源博客中阅读:

      两个实用的代码编写标准是:

      1. 编写正确易懂的代码。如果你的代码是正确的,你的模型就更有可能产生好的结果,你的研究结论是正确的,你将创造出一些实际有用的东西。
      2. 确保任何人都可以复制你所做的一切——例如模型、结果、图形——通过在终端中运行一个命令(例如“python main.py”). 将有助于其他人在你的工作基础上再接再厉,也有助于“未来的你”在自己的工作基础上再接再厉。
    • 您还可以看一下 唐宇迪老师的深度学习模型部署与剪枝优化实例课程中的 常见剪枝方法介绍小节, 巩固相关知识点

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 创建了问题 4月5日

悬赏问题

  • ¥15 gojs 点击按钮node的position位置进行改变,再次点击回到原来的位置
  • ¥15 计算决策面并仿真附上结果
  • ¥20 halcon 图像拼接
  • ¥15 webstorm上开发的vue3+vite5+typeScript打包时报错
  • ¥15 vue使用gojs,需求在link中的虚线上添加方向箭头
  • ¥15 CSS通配符清除内外边距为什么可以覆盖默认样式?
  • ¥15 SPSS分类模型实训题步骤
  • ¥100 求ASMedia ASM1184e & ASM1187e 芯片datasheet/规格书
  • ¥15 求解决扩散模型代码问题
  • ¥15 工创大赛太阳能电动车项目零基础要学什么