Jeff Enhanced Ver4.0 2019-08-21 14:49 采纳率: 0%
浏览 1715
已结题

关于使用深度强化学习Actor-Critic算法玩gym库中CartPole游戏不收敛的问题,高分悬赏。

小弟最近在自学深度强化学习,看的莫烦大佬的视频。其中有一个用AC算法玩gym库中CartPole的游戏实例,自己写的代码不知为何不能够收敛。考虑到自己自己写的程序中将AC网络写到一个类里去了,尝试过在A网络训练时截断C网络的梯度反向传播防止干扰,但还是不收敛。
小弟小白初学者自己瞎琢磨的,实在找不出原因,高分悬赏,希望大佬们能解惑。代码如下,其中有两个文件,一个是用以运行的主程序,另一个是主程序要调用的类,大佬们跑一下试试。
另外,真心诚意提问,请勿复制粘贴答非所问。

########主程序:AC_RL_run_this##########

import gym
from AC_RL_brain import ACNetwork


def run_game():
    step = 0
    for episode in range(100000):
        episode_reward = 0
        observation = env.reset()
        while True:
            if episode_reward > 20:
                env.render()
            action = RL.choose_action(observation)
            observation_, reward, done, _ = env.step(action)
            if done:
                reward = -20
            RL.C_learn(observation, reward, observation_)
            RL.A_learn(observation, action)
            episode_reward += reward
            if done:
                break
            observation = observation_
            step += 1
        print('%d回合总回报:%f' % (episode, episode_reward))
    print('game over')
    env.close()


if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    env.seed(1)
    RL = ACNetwork(
        n_actions=env.action_space.n,
        n_features=env.observation_space.shape[0],
        gamma=0.95,
        A_lr=0.001,
        C_lr=0.01,
    )
    run_game()

########需要调用的类:AC_RL_brain##########

import tensorflow as tf
import numpy as np


np.random.seed(2)
tf.set_random_seed(2)  # reproducible


class ACNetwork:
    def __init__(
            self,
            n_actions,
            n_features,
            gamma,
            A_lr,
            C_lr,
    ):
        self.n_actions = n_actions
        self.n_features = n_features
        self.gamma = gamma
        self.A_lr = A_lr
        self.C_lr = C_lr
        self.td_error_real = 0
        self._build_net()
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())

    def _build_net(self):
        # placeholder
        self.s = tf.placeholder(tf.float32, [1, self.n_features], "state")
        self.v_ = tf.placeholder(tf.float32, [1, 1], "v_next")
        self.r = tf.placeholder(tf.float32, None, 'r')
        self.a = tf.placeholder(tf.int32, None, "act")

        # A_net
        l1_A = tf.layers.dense(
            inputs=self.s,
            units=20,  # number of hidden units
            activation=tf.nn.relu,
            kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
            bias_initializer=tf.constant_initializer(0.1),  # biases
        )

        self.acts_prob = tf.layers.dense(
            inputs=l1_A,
            units=self.n_actions,  # output units
            activation=tf.nn.softmax,  # get action probabilities
            kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
            bias_initializer=tf.constant_initializer(0.1),  # biases
        )

        self.log_prob = tf.log(self.acts_prob[0, self.a])
        self.exp_v = tf.reduce_mean(self.log_prob * self.td_error_real)  # advantage (TD_error) guided loss
        self.train_op_A = tf.train.AdamOptimizer(self.A_lr).minimize(-self.exp_v)  # minimize(-exp_v) = maximize(exp_v)

        # C_net
        l1_C = tf.layers.dense(
            inputs=self.s,
            units=20,  # number of hidden units
            activation=tf.nn.relu,  # None
            # have to be linear to make sure the convergence of actor.
            # But linear approximator seems hardly learns the correct Q.
            kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
            bias_initializer=tf.constant_initializer(0.1),  # biases
        )

        self.v = tf.layers.dense(
            inputs=l1_C,
            units=1,  # output units
            activation=None,
            kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
            bias_initializer=tf.constant_initializer(0.1),  # biases
        )

        self.td_error = self.r + self.gamma * self.v_ - self.v
        self.loss = tf.square(self.td_error)  # TD_error = (r+gamma*V_next) - V_eval
        self.train_op_C = tf.train.AdamOptimizer(self.C_lr).minimize(self.loss)

    def choose_action(self, s):
        s = s[np.newaxis, :]
        probs = self.sess.run(self.acts_prob, {self.s: s})  # get probabilities for all actions
        return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel())  # return a int

    def A_learn(self, s, a):
        s = s[np.newaxis, :]
        feed_dict = {self.s: s, self.a: a}
        _, exp_v = self.sess.run([self.train_op_A, self.exp_v], feed_dict)

    def C_learn(self, s, r, s_):
        s, s_ = s[np.newaxis, :], s_[np.newaxis, :]
        v_ = self.sess.run(self.v, {self.s: s_})
        self.td_error_real, _ = self.sess.run([self.td_error, self.train_op_C],
                                    {self.s: s, self.v_: v_, self.r: r})

  • 写回答

1条回答 默认 最新

  • 关注

    class Brain:
    def init(self):
    self.params = {}
    self.model, self.trainer, self.loss = self._create()

    def _create(self):
        observation = C.sequence.input_variable(n_state, np.float32, name='s')
        q_target = C.sequence.input_variable(n_action, np.float32, name='q')
    
        l1 = C.layers.Dense(hidden_size, activation=C.relu)
        l2 = C.layers.Dense(n_action)
        unbound_model = C.layers.Sequential([l1, l2])
        self.model = unbound_model(observation)
    
        self.params = dict(W1=l1.W, b1=l1.b, W2=l2.W, b2=l2.b)
    
        self.loss = C.reduce_mean(C.square(self.model-q_target), axis=0)
        meas = C.reduce_mean(C.square(self.model-q_target), axis=0)
    
        lr_schedule = C.learning_rate_schedule(learning_rate, C.UnitType.minibatch)
        learner = C.sgd(self.model.parameters,
                        lr_schedule,
                        gradient_clipping_threshold_per_sample=10)
    
        progress_printer = C.logging.ProgressPrinter(500)
        self.trainer = C.Trainer(self.model, (self.loss, meas), learner, progress_printer)
    
        return self.model, self.trainer, self.loss
    
    def train(self, x, y):
        arguments = dict(zip(self.loss.arguments, [x,y]))
        updated, results = self.trainer.train_minibatch(arguments, outputs=[self.loss.output])
    
    def predict(self, s):
        return self.model.eval([s])
    

    class Memory: # stored as ( s, a, r, s_ )
    samples = []

    def __init(self):
        pass
    
    def add(self, sample):
        self.samples.append(sample)
    
    def sample(self, n):
        n = min(n, len(self.samples))
        return random.sample(self.samples, n)
    

    class Agent:
    steps = 0
    epsilon = max_epsilon

    def __init__(self):
        self.brain = Brain()
        self.memory = Memory()
    
    def act(self, s):
        if random.random() < self.epsilon:
            return random.randint(0, n_action-1)
        else:
            return np.argmax(self.brain.predict(s))
    
    def observe(self, sample): # in (s, a, r, s_) format
        self.memory.add(sample)
        self.steps += 1
        self.epsilon = min_epsilon + (max_epsilon - min_epsilon) * math.exp(-epsilon_decay * self.steps)
    
    def replay(self):
        batch = self.memory.sample(batch_size)
    
        no_state = np.zeros(n_state)
    
        states = np.array([ o[0] for o in batch ], dtype=np.float32)
        states_ = np.array([ (no_state if o[3] is None else o[3]) for o in batch ], dtype=np.float32)
    
        p = self.brain.predict(states)
        p_ = self.brain.predict((states_))
    
        x = np.zeros((len(batch), n_state)).astype(np.float32)
        y = np.zeros((len(batch), n_action)).astype(np.float32)
    
        for i in range(len(batch)):
            s, a, r, s_ = batch[i]
    
            t = p[0][i] # CNTK: [0] because of sequence dimension
            if s_ is None:
                t[a] = r
            else:
                t[a] = r + reward_discount * np.amax(p_[0][i])
    
            x[i] = s
            y[i] = t
    
        self.brain.train(x, y)
    

    def run(agent):
    s = env.reset()
    R = 0

    while True:
        env.render()
    
        a = agent.act(s.astype(np.float32))
        s_, r, done, info = env.step(a)
    
        if done:
            s_ = None
    
        agent.observe((s, a, r, s_))
        agent.replay()
    
        s = s_
        R += r
    
        if done:
            return R
    

    agent = Agent()

    epoch = 0
    reward_sum = 0
    while epoch < 30000:
    reward = run(agent)
    reward_sum += reward
    epoch += 1
    if epoch % epoch_baseline == 0:
    print('Epoch %d, average reward is %f, memory size is %d'
    % (epoch, reward_sum / epoch_baseline, len(agent.memory.samples)))

        if reward_sum / epoch_baseline > reward_target:
            print('Task solved in %d epoch' % epoch)
            break
    
        reward_sum = 0
    
    评论

报告相同问题?

悬赏问题

  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 对于相关问题的求解与代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料