import sys
import time
import random
import numpy as np
import gymnasium as gym
from frozenlake import *
def lspi(D, pi0, model, argmaxuQ, gamma, p):
dtheta_converge = p.get("dtheta_converge", 1e-9)
iter_max = p.get("iter_max", 100)
online_printing = p.get("online_printing", True)
Y = D["Y"]
U = D["U"]
R = D["R"]
Yn = D["Yn"]
dimY = Y.shape[0]
N = Y.shape[1]
dimU = U.shape[0]
Un = np.zeros((dimU, N))
pi = pi0
for n_iter in range(iter_max):
for n in range(N-1):
Un[:, n] = pi(Yn[:, n])
model = lstd0q(Y, U, R.T, Yn, Un, model, gamma, [])
def Q(y, u): return -predict_linear(np.concatenate((y, u), axis=0), model)
pi = argmaxuQ(model["w"])
if online_printing:
Qp = np.empty(N)
Qpn = np.empty(N)
for n in range(N-1):
Qp[n] = Q(Y[:, n], U[:, n])[0]
Qpn[n] = Q(Yn[:, n], Un[:, n])[0]
rmstde = get_rmstde(Qp, Qpn, R, gamma)[0]
print(f"Iteration = {n_iter}, RMSTDE = {rmstde:.6f}, Qp = {np.mean(Qp):.6f} +/- {np.std(Qp):.6f}")
if n_iter > 0:
dtheta = np.linalg.norm(np.ravel(model["w"]) - np.ravel(modelp["w"]))
if dtheta < dtheta_converge:
print('Minimum parameter change threshold reached.')
break
modelp = model
if n_iter == iter_max and online_printing:
print('Max. number of iterations reached.')
return pi, Q, model, n_iter
def lstd0q(Y, U, R, Yn, Un, model, gamma, p):
Z = np.concatenate((Y, U), axis=0)
Zn = np.concatenate((Yn, Un), axis=0)
model = lstd0(Z, R, Zn, model, gamma, p)
return model
def lstd0(Y, R, Yn, model, gamma, p):
N = Y.shape[1]
dimPhi = model['phi'](Y[:, 0]).shape[0]
Phi = np.empty((N, dimPhi))
Phin = np.empty((N, dimPhi))
for n in range(N-1):
Phi[n, :] = model['phi'](Y[:, n]).ravel()
Phin[n, :] = model['phi'](Yn[:, n]).ravel()
model['w'] = wlstd0(Phi, Phin, R, gamma)
return model
def wlstd0(Phi, Phin, R, gamma):
A = np.dot(Phi.T, Phi - gamma * Phin) + 0.01 * np.eye(Phi.shape[1])
b = np.dot(Phi.T, R)
lambda_identity = 0.0001 * np.eye(Phi.shape[1])
A += lambda_identity
theta, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
return theta
def epsilon_greedy_policy(state, Q, epsilon=0.1):
if random.uniform(0, 1) < epsilon:
return [environment.action_space.sample()] # 随机选择动作
else:
return [np.argmax(Q(state))]
def evaluate_policy(environment, Q, K, epsilon=0.1):
y = []
u = []
l = []
state, info = environment.reset()
done = False
for i in range(K):
action = epsilon_greedy_policy(state, Q, epsilon)
next_state, cost, done, truncated, info = environment.step(action[0])
y.append(state)
u.append(action)
l.append(cost)
state = next_state
if done or truncated:
break
y = np.array(y) # Convert the list of states to a numpy array
l = np.array(l) # Convert the list of costs to a numpy array
u = np.array(u) # Convert the list of actions to a numpy array
return y, u, l
def evaluate_lspi(environment):
"""Evaluate the LSPI algorithm on the given environment and data."""
p = dict()
gamma = 0.9 # Discount factor, adjust according to your needs
p["iter_max"] = 100 # Maximum number of iterations
p["dtheta_converge"] = 0.01 # Parameter convergence threshold
# Function to find the action that maximizes Q-value given theta
def fnArgmaxuQ(_theta):
return argmaxuQ_f_basis_frozenlake(_theta, environment.unwrapped)
# Basis function for feature representation
def phi(z):
return f_basis_frozenlake(z[0], z[1], environment.unwrapped)
# Initialize the model
model = dict()
model["phi"] = phi
model["w"] = np.zeros(environment.unwrapped.observation_space.n * environment.unwrapped.action_space.n)
# Initial policy that always selects action 0
def pi0(x):
return [0]
# Generate training data
data = {"Y": [], "U": [], "R": [], "Yn": []}
num_episodes = 1000 # Number of episodes to generate data
for _ in range(num_episodes):
state, info = environment.reset()
done = False
while not done:
action = environment.action_space.sample() # Sample a random action
next_state, reward, done, truncated, info = environment.step(action) # Step in the environment
data["Y"].append(state) # Append current state to data
data["U"].append(action) # Append action to data
data["R"].append(reward) # Append reward to data
data["Yn"].append(next_state) # Append next state to data
state = next_state # Update state to next state
for key in data:
data[key] = np.array(data[key]).reshape(1, -1) # Convert lists to numpy arrays
D = data # Training data
# Train LSPI to get the learned policy
pi, Q, model, n_iter = lspi(D, pi0, model, fnArgmaxuQ, gamma, p)
# Evaluate the learned policy
episodes = 100 # Number of evaluation episodes
total_costs = []
for _ in range(episodes):
y, u, l = evaluate_policy(environment, pi, 10) # Evaluate for a maximum of 10 steps per episode
total_costs.append(sum(l)) # Append the total cost (sum of rewards) for this episode
avg_cost = np.mean(total_costs) # Calculate average cost
std_cost = np.std(total_costs) # Calculate standard deviation of cost
# Print evaluation results for debugging
print(f"Average Cost: {avg_cost}, Standard Deviation of Cost: {std_cost}")
# Return the learned policy and the number of iterations
return pi, n_iter
def main():
random.seed(1)
np.random.seed(1)
environment = gym.make('FrozenLake-v1', desc=["SFFF", "FFFF", "FFFF", "HHHG"], map_name="4x4", is_slippery=False,
render_mode="human")
policy, n_iter = evaluate_lspi(environment)
# Evaluate the learned policy with a maximum of 5 steps
y, u, l = evaluate_policy(environment, policy, 5)
print("u =", u)
print("y =", y)
print("l =", l)
print('Cost (evaluated on plant) = ' + str(sum(l)))
if __name__ == "__main__":
main()
我的问题就是这个小精灵为啥会一直动下去?明明我已经设置好了,到了终点就停下啊,结束程序