¥YRQ¥ 2024-04-16 16:49 采纳率: 16.7%
浏览 24

python强化学习SAC算法



#构造一个简单的环境,测试代码收敛性
class Environment():
    def __init__(self):
        self.min_action = -1
        self.max_action = 1

        self.dim_state = 6
        self.dim_action = 3
        self.chase = None
        self.escape = None
        self.initaldistance = None
        self.time = 1

    def reset(self):

        self._initialize_positions()
        observation = self._get_observation()
        return observation

    def _initialize_positions(self):  # 初始化航天器位置信息
        current_time = int(time.time())
        # 使用当前时间戳作为随机数生成器的种子
        random.seed(current_time)
        self.chase = torch.tensor([random.randint(1, 60),random.randint(1, 60),random.randint(1, 60)])
        self.escape = torch.tensor([random.randint(1, 60),random.randint(1, 60),random.randint(1, 60)])
        self.initaldistance = (torch.abs(self.chase[0]-self.escape[0]) + torch.abs(self.chase[1]-self.escape[1]) + torch.abs(self.chase[2]-self.escape[2]))

    def step(self, action):  # 输入action,输出状态,奖励,结束标志,action应该也是tensor
        # action = self.chooseAction(action)  # 离散转化为向量
        # print(action)
        self.chase = self.update_state(self.chase, torch.tensor(action))
        action = torch.tensor([0, 0, 0])
        self.escape = self.update_state(self.escape, action)
        # print("escape is",self.escape,"chase is ",self.chase)

        observation = self._get_observation()
        reward = self.reward()
        done = self.check_termination()
        return observation, reward, done, None

    # def chooseAction(self, action):
    #     if (action == 0):
    #         output = torch.tensor([1, 0, 0])
    #     elif (action == 1):
    #         output = torch.tensor([0, 1, 0])
    #     elif (action == 2):
    #         output = torch.tensor([0, 0, 1])
    #     elif (action == 3):
    #         output = torch.tensor([0, 0, 0])
    #     elif (action == 4):
    #         output = torch.tensor([-1, 0, 0])
    #     elif (action == 5):
    #         output = torch.tensor([0, -1, 0])
    #     elif (action == 6):
    #         output = torch.tensor([0, 0, -1])
    #     return output


    def update_state(self, state, action):  # 输入0-1的state,更新self.state
        state = state + action
        return state

    def _get_observation(self):
        observation = torch.cat((self.chase.unsqueeze(0), self.escape.unsqueeze(0)), dim=1)
        return observation

    def check_termination(self):
        terminate = False
        if self.escape[0] == self.chase[0] and self.escape[1] == self.chase[1] and self.escape[2] == self.chase[2]:
            terminate = True
        return terminate

    def reward(self):
        distance = (torch.abs(self.chase[0]-self.escape[0]) + torch.abs(self.chase[1]-self.escape[1]) + torch.abs(self.chase[2]-self.escape[2]))
        # 当追逐者靠近逃逸者时,奖励递增;距离越近,奖励越大
        reward = self.initaldistance - distance
        # if distance<self.initaldistance:
        #     reward = self.initaldistance - distance
        if self.chase[0]>60 or self.chase[1]>60 or self.chase[2]>60:#对范围进行限制
            reward = reward - torch.abs(self.chase[0]-60)- torch.abs(self.chase[1]-60)- torch.abs(self.chase[2]-60)
        if self.chase[0]<0 or self.chase[1]<0:
            reward = reward - torch.abs(self.chase[0]) - torch.abs(self.chase[1]) - torch.abs(self.chase[2])
        if self.escape[0] == self.chase[0] and self.escape[1] == self.chase[1] and self.escape[2] == self.chase[2]:
            reward = 1000000000000
            # print("")
        return reward

class ReplayBeffer():
    def __init__(self, buffer_maxlen):
        self.buffer = deque(maxlen=buffer_maxlen)

    def push(self, data):
        self.buffer.append(data)

    def sample(self, batch_size):
        state_list = []
        action_list = []
        reward_list = []
        next_state_list = []
        done_list = []

        batch = random.sample(self.buffer, batch_size)
        for experience in batch:
            s, a, r, n_s, d = experience
            # state, action, reward, next_state, done

            state_list.append(s)
            action_list.append(a)
            reward_list.append(r)
            next_state_list.append(n_s)
            done_list.append(d)


        return torch.stack([state.to(device) for state in state_list]), \
               torch.FloatTensor(action_list).to(device), \
               torch.FloatTensor(reward_list).unsqueeze(-1).to(device), \
               torch.stack([state[0].to(device) for state in next_state_list]), \
               torch.FloatTensor(done_list).unsqueeze(-1).to(device)

    def buffer_len(self):
        return len(self.buffer)


# Value Net
class ValueNet(nn.Module):
    def __init__(self, state_dim, edge=3e-3):
        super(ValueNet, self).__init__()
        self.linear1 = nn.Linear(state_dim, 256)
        self.linear2 = nn.Linear(256, 256)
        self.linear3 = nn.Linear(256, 3)

        self.linear3.weight.data.uniform_(-edge, edge)
        self.linear3.bias.data.uniform_(-edge, edge)

    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)

        return x


# Soft Q Net
class SoftQNet(nn.Module):
    def __init__(self, state_dim, action_dim, edge=3e-3):
        super(SoftQNet, self).__init__()
        self.linear1 = nn.Linear(state_dim + action_dim, 256)
        self.linear2 = nn.Linear(256, 256)
        self.linear3 = nn.Linear(256, 3)

        self.linear3.weight.data.uniform_(-edge, edge)
        self.linear3.bias.data.uniform_(-edge, edge)

    def forward(self, state, action):
        x = torch.cat([state, action], 1)
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)

        return x


# Policy Net
class PolicyNet(nn.Module):
    def __init__(self, state_dim, action_dim, log_std_min=-20, log_std_max=2, edge=3e-3):
        super(PolicyNet, self).__init__()
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max

        self.linear1 = nn.Linear(state_dim, 256)
        self.linear2 = nn.Linear(256, 256)

        self.mean_linear = nn.Linear(256, action_dim)
        self.mean_linear.weight.data.uniform_(-edge, edge)
        self.mean_linear.bias.data.uniform_(-edge, edge)

        self.log_std_linear = nn.Linear(256, action_dim)
        self.log_std_linear.weight.data.uniform_(-edge, edge)
        self.log_std_linear.bias.data.uniform_(-edge, edge)

    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))

        mean = self.mean_linear(x)
        log_std = self.log_std_linear(x)
        log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)

        return mean, log_std

    def action(self, state):
        # state = torch.FloatTensor(state).to(device)
        # mean, log_std = self.forward(state)
        mean, log_std = self.forward(state.float().to(device))
        std = log_std.exp()
        normal = Normal(mean, std)

        z = normal.sample()
        action = torch.tanh(z).detach().cpu().numpy()

        return action

    # Use re-parameterization tick
    def evaluate(self, state, epsilon=1e-6):
        mean, log_std = self.forward(state)
        std = log_std.exp()
        normal = Normal(mean, std)
        noise = Normal(0, 1)

        z = noise.sample()
        action = torch.tanh(mean + std * z.to(device))
        log_prob = normal.log_prob(mean + std * z.to(device)) - torch.log(1 - action.pow(2) + epsilon)

        return action, log_prob


class SAC:
    def __init__(self, env, gamma, tau, buffer_maxlen, value_lr, q_lr, policy_lr):

        self.env = env
        self.state_dim = env.dim_state
        self.action_dim = env.dim_action

        # hyperparameters
        self.gamma = gamma
        self.tau = tau

        # initialize networks
        self.value_net = ValueNet(self.state_dim).to(device)
        self.target_value_net = ValueNet(self.state_dim).to(device)
        self.q1_net = SoftQNet(self.state_dim, self.action_dim).to(device)
        self.q2_net = SoftQNet(self.state_dim, self.action_dim).to(device)
        self.policy_net = PolicyNet(self.state_dim, self.action_dim).to(device)

        # Load the target value network parameters
        for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):
            target_param.data.copy_(self.tau * param + (1 - self.tau) * target_param)

        # Initialize the optimizer
        self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=value_lr)
        self.q1_optimizer = optim.Adam(self.q1_net.parameters(), lr=q_lr)
        self.q2_optimizer = optim.Adam(self.q2_net.parameters(), lr=q_lr)
        self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=policy_lr)

        # Initialize thebuffer
        self.buffer = ReplayBeffer(buffer_maxlen)

    def get_action(self, state):
        action = self.policy_net.action(state)
        return action

    def update(self, batch_size):
        state, action, reward, next_state, done = self.buffer.sample(batch_size)
        new_action, log_prob = self.policy_net.evaluate(state)

        # V value loss
        value = self.value_net(state)
        new_q1_value = self.q1_net(state, new_action)
        new_q2_value = self.q2_net(state, new_action)
        next_value = torch.min(new_q1_value, new_q2_value) - log_prob
        value_loss = F.mse_loss(value, next_value.detach())

        # Soft q  loss
        q1_value = self.q1_net(state, action)
        q2_value = self.q2_net(state, action)
        target_value = self.target_value_net(next_state)
        target_q_value = reward + done * self.gamma * target_value
        q1_value_loss = F.mse_loss(q1_value, target_q_value.detach())
        q2_value_loss = F.mse_loss(q2_value, target_q_value.detach())

        # Policy loss
        policy_loss = (log_prob - torch.min(new_q1_value, new_q2_value)).mean()

        # Update Policy
        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()

        # Update v
        self.value_optimizer.zero_grad()
        value_loss.backward()
        self.value_optimizer.step()

        # Update Soft q
        self.q1_optimizer.zero_grad()
        self.q2_optimizer.zero_grad()
        q1_value_loss.backward()
        q2_value_loss.backward()
        self.q1_optimizer.step()
        self.q2_optimizer.step()

        # Update target networks
        for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):
            target_param.data.copy_(self.tau * param + (1 - self.tau) * target_param)


def main(env, agent, Episode, batch_size):
    Return = []
    action_range = [env.min_action, env.max_action]

    for episode in range(Episode):
        score = 0
        state = env.reset()[0]
        for i in range(200):
            action = agent.get_action(state)
            # action output range[-1,1],expand to allowable range
            action_in = action * (action_range[1] - action_range[0]) / 2.0 + (action_range[1] + action_range[0]) / 2.0

            next_state, reward, done, _ = env.step(action_in)
            #return observation, reward, done, None
            # next_state, reward, done, _ = env.step(action)
            done_mask = 0.0 if done else 1.0

            agent.buffer.push((state, action, reward, next_state, done_mask))
            state = next_state[0]

            score += reward
            if done:
                break
            if agent.buffer.buffer_len() > 200:
                agent.update(batch_size)

        print("episode:{}, Return:{}, buffer_capacity:{}".format(episode, score, agent.buffer.buffer_len()))
        Return.append(score)
        score = 0
    # env.close()
    plt.plot(Return)
    plt.ylabel('Return')
    plt.xlabel("Episode")
    plt.grid(True)
    plt.show()

    # torch.save(agent.policy_net, "policy-center.pkl")


if __name__ == '__main__':
    env = Environment()
    device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

    # Params
    tau = 0.01
    gamma = 0.99
    q_lr = 3e-3
    value_lr = 3e-3
    policy_lr = 3e-3
    buffer_maxlen = 50000

    Episode = 1000
    batch_size = 128

    agent = SAC(env, gamma, tau, buffer_maxlen, value_lr, q_lr, policy_lr)
    main(env, agent, Episode, batch_size)

img

这个代码运行的时候不能收敛,奖励函数不能稳定的变大有没有好的解决方法

  • 写回答

4条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-04-16 16:49
    关注

    引自免费微信小程序:皆我百晓生

    在提供的代码中,我们观察到SAC(Soft Actor-Critic)算法应用于一个简单的三维空间中的追逃游戏,其中追逐者试图接近逃逸者。然而,从您给出的训练结果来看,奖励函数并没有稳定增长,这表明算法可能没有正确地学习到有效的策略。为了提高收敛性和性能,我们可以尝试以下几个改进方案:

    1. 调整学习率:检查并调整学习率,尤其是对于Q网络(q_lr)和价值网络(value_lr)。过高的学习率可能导致不稳定的学习过程,而过低的学习率可能会导致收敛速度慢。

    2. 增加经验回放的大小:增大buffer_maxlen可以提供更多的样本用于训练,有助于更好地估计值函数和动作值函数,从而改善收敛性。

    3. 使用更复杂的网络结构:尝试使用更深、更宽的神经网络,比如增加隐藏层的数量或节点数量,以便网络能更好地拟合复杂的游戏动态。

    4. 目标网络更新参数:调整τ(τ = 0.01),目标网络与主网络之间的更新权重。较小的τ值会减缓目标网络的更新,有助于稳定训练。

    5. 噪声衰减:在早期训练阶段,增强探索,然后逐渐减少噪声。可以通过在PolicyNetevaluate()函数中添加ε-greedy策略或者 Ornstein-Uhlenbeck 过程(OU Process)来实现。

    6. 行动标准化:将动作范围归一化至[-1, 1],并在计算损失和执行动作时恢复原始范围,确保动作在允许的范围内。

    7. 动作选择优化:在评估动作时,使用动作的平均值加上噪声,而不是仅使用平均值。这样可以帮助算法更好地探索环境。

    下面是根据上述建议修改后的代码:

    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torch.distributions import Normal
    import numpy as np
    import time
    import matplotlib.pyplot as plt
    from collections import deque
    import random
    
    device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
    
    class Environment():
        ...
    
    class ReplayBuffer():
        ...
    
    class ValueNet(nn.Module):
        ...
    
    class SoftQNet(nn.Module):
        ...
    
    class PolicyNet(nn.Module):
        ...
    
    class SAC:
        def __init__(self, env, gamma, tau, buffer_maxlen, value_lr, q_lr, policy_lr, exploration_noise=0.1, ou_theta=0.15, ou_sigma=0.2):
            ...
            self.exploration_noise = exploration_noise
            self.ou_theta = ou_theta
            self.ou_sigma = ou_sigma
            self.ou_process = OUProcess(self.action_dim, theta=self.ou_theta, sigma=self.ou_sigma)
    
        def get_action(self, state):
            mean, log_std = self.policy_net(state)
            std = log_std.exp()
            normal = Normal(mean, std)
            noise = self.ou_process.step()
            action = torch.tanh(normal.loc + noise) * self.action_range + self.action_mean
            return action
    
        def evaluate(self, state):
            mean, log_std = self.policy_net(state)
            std = log_std.exp()
            normal = Normal(mean, std)
            return mean, log_std, normal
    
    class OUProcess:
        def __init__(self, action_dim, theta=0.15, sigma=0.2):
            self.action_dim = action_dim
            self.theta = theta
            self.sigma = sigma
            self.state = torch.zeros(action_dim).to(device)
    
        def step(self):
            dx = self.theta * (self.state - torch.zeros_like(self.state)) + self.sigma * torch.randn_like(self.state)
            self.state += dx
            return self.state
    
    def main(env, agent, Episode, batch_size):
        ...
        for episode in range(Episode):
            ...
            action = agent.get_action(state)
            ...
    
    if __name__ == '__main__':
        env = Environment()
        ...
        agent = SAC(env, gamma, tau, buffer_maxlen, value_lr, q_lr, policy_lr, exploration_noise=0.1, ou_theta=0.15, ou_sigma=0.2)
        main(env, agent, Episode, batch_size)
    

    在这个版本中,我增加了几个关键更改:

    • 在SAC类中引入了额外参数exploration_noiseou_thetaou_sigma,以及一个基于Ornstein-Uhlenbeck过程的OUProcess类。
    • 修改了get_action方法,使用Ornstein-Uhlenbeck过程和动作均值+噪声的方式选取动作。
    • 初始化了一个OUProcess对象,并在每次获取动作时调用其step方法。
    • evaluate方法移除,因为它在原代码中未被使用。

    这些改变应该有助于改善算法的探索和收敛性。您可以尝试不同的超参数组合,例如学习率、噪声衰减速率等,找到最佳配置。同时,注意观察训练过程中的奖励变化,以确定是否有所改善。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 4月16日

悬赏问题

  • ¥15 数据量少可以用MK趋势分析吗
  • ¥15 使用VH6501干扰RTR位,CANoe上显示的错误帧不足32个就进入bus off快慢恢复,为什么?
  • ¥15 大智慧怎么编写一个选股程序
  • ¥100 python 调用 cgps 命令获取 实时位置信息
  • ¥15 两台交换机分别是trunk接口和access接口为何无法通信,通信过程是如何?
  • ¥15 C语言使用vscode编码错误
  • ¥15 用KSV5转成本时,如何不生成那笔中间凭证
  • ¥20 ensp怎么配置让PC1和PC2通讯上
  • ¥50 有没有适合匹配类似图中的运动规律的图像处理算法
  • ¥15 dnat基础问题,本机发出,别人返回的包,不能命中