#构造一个简单的环境,测试代码收敛性
class Environment():
def __init__(self):
self.min_action = -1
self.max_action = 1
self.dim_state = 6
self.dim_action = 3
self.chase = None
self.escape = None
self.initaldistance = None
self.time = 1
def reset(self):
self._initialize_positions()
observation = self._get_observation()
return observation
def _initialize_positions(self): # 初始化航天器位置信息
current_time = int(time.time())
# 使用当前时间戳作为随机数生成器的种子
random.seed(current_time)
self.chase = torch.tensor([random.randint(1, 60),random.randint(1, 60),random.randint(1, 60)])
self.escape = torch.tensor([random.randint(1, 60),random.randint(1, 60),random.randint(1, 60)])
self.initaldistance = (torch.abs(self.chase[0]-self.escape[0]) + torch.abs(self.chase[1]-self.escape[1]) + torch.abs(self.chase[2]-self.escape[2]))
def step(self, action): # 输入action,输出状态,奖励,结束标志,action应该也是tensor
# action = self.chooseAction(action) # 离散转化为向量
# print(action)
self.chase = self.update_state(self.chase, torch.tensor(action))
action = torch.tensor([0, 0, 0])
self.escape = self.update_state(self.escape, action)
# print("escape is",self.escape,"chase is ",self.chase)
observation = self._get_observation()
reward = self.reward()
done = self.check_termination()
return observation, reward, done, None
# def chooseAction(self, action):
# if (action == 0):
# output = torch.tensor([1, 0, 0])
# elif (action == 1):
# output = torch.tensor([0, 1, 0])
# elif (action == 2):
# output = torch.tensor([0, 0, 1])
# elif (action == 3):
# output = torch.tensor([0, 0, 0])
# elif (action == 4):
# output = torch.tensor([-1, 0, 0])
# elif (action == 5):
# output = torch.tensor([0, -1, 0])
# elif (action == 6):
# output = torch.tensor([0, 0, -1])
# return output
def update_state(self, state, action): # 输入0-1的state,更新self.state
state = state + action
return state
def _get_observation(self):
observation = torch.cat((self.chase.unsqueeze(0), self.escape.unsqueeze(0)), dim=1)
return observation
def check_termination(self):
terminate = False
if self.escape[0] == self.chase[0] and self.escape[1] == self.chase[1] and self.escape[2] == self.chase[2]:
terminate = True
return terminate
def reward(self):
distance = (torch.abs(self.chase[0]-self.escape[0]) + torch.abs(self.chase[1]-self.escape[1]) + torch.abs(self.chase[2]-self.escape[2]))
# 当追逐者靠近逃逸者时,奖励递增;距离越近,奖励越大
reward = self.initaldistance - distance
# if distance<self.initaldistance:
# reward = self.initaldistance - distance
if self.chase[0]>60 or self.chase[1]>60 or self.chase[2]>60:#对范围进行限制
reward = reward - torch.abs(self.chase[0]-60)- torch.abs(self.chase[1]-60)- torch.abs(self.chase[2]-60)
if self.chase[0]<0 or self.chase[1]<0:
reward = reward - torch.abs(self.chase[0]) - torch.abs(self.chase[1]) - torch.abs(self.chase[2])
if self.escape[0] == self.chase[0] and self.escape[1] == self.chase[1] and self.escape[2] == self.chase[2]:
reward = 1000000000000
# print("")
return reward
class ReplayBeffer():
def __init__(self, buffer_maxlen):
self.buffer = deque(maxlen=buffer_maxlen)
def push(self, data):
self.buffer.append(data)
def sample(self, batch_size):
state_list = []
action_list = []
reward_list = []
next_state_list = []
done_list = []
batch = random.sample(self.buffer, batch_size)
for experience in batch:
s, a, r, n_s, d = experience
# state, action, reward, next_state, done
state_list.append(s)
action_list.append(a)
reward_list.append(r)
next_state_list.append(n_s)
done_list.append(d)
return torch.stack([state.to(device) for state in state_list]), \
torch.FloatTensor(action_list).to(device), \
torch.FloatTensor(reward_list).unsqueeze(-1).to(device), \
torch.stack([state[0].to(device) for state in next_state_list]), \
torch.FloatTensor(done_list).unsqueeze(-1).to(device)
def buffer_len(self):
return len(self.buffer)
# Value Net
class ValueNet(nn.Module):
def __init__(self, state_dim, edge=3e-3):
super(ValueNet, self).__init__()
self.linear1 = nn.Linear(state_dim, 256)
self.linear2 = nn.Linear(256, 256)
self.linear3 = nn.Linear(256, 3)
self.linear3.weight.data.uniform_(-edge, edge)
self.linear3.bias.data.uniform_(-edge, edge)
def forward(self, state):
x = F.relu(self.linear1(state))
x = F.relu(self.linear2(x))
x = self.linear3(x)
return x
# Soft Q Net
class SoftQNet(nn.Module):
def __init__(self, state_dim, action_dim, edge=3e-3):
super(SoftQNet, self).__init__()
self.linear1 = nn.Linear(state_dim + action_dim, 256)
self.linear2 = nn.Linear(256, 256)
self.linear3 = nn.Linear(256, 3)
self.linear3.weight.data.uniform_(-edge, edge)
self.linear3.bias.data.uniform_(-edge, edge)
def forward(self, state, action):
x = torch.cat([state, action], 1)
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
x = self.linear3(x)
return x
# Policy Net
class PolicyNet(nn.Module):
def __init__(self, state_dim, action_dim, log_std_min=-20, log_std_max=2, edge=3e-3):
super(PolicyNet, self).__init__()
self.log_std_min = log_std_min
self.log_std_max = log_std_max
self.linear1 = nn.Linear(state_dim, 256)
self.linear2 = nn.Linear(256, 256)
self.mean_linear = nn.Linear(256, action_dim)
self.mean_linear.weight.data.uniform_(-edge, edge)
self.mean_linear.bias.data.uniform_(-edge, edge)
self.log_std_linear = nn.Linear(256, action_dim)
self.log_std_linear.weight.data.uniform_(-edge, edge)
self.log_std_linear.bias.data.uniform_(-edge, edge)
def forward(self, state):
x = F.relu(self.linear1(state))
x = F.relu(self.linear2(x))
mean = self.mean_linear(x)
log_std = self.log_std_linear(x)
log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
return mean, log_std
def action(self, state):
# state = torch.FloatTensor(state).to(device)
# mean, log_std = self.forward(state)
mean, log_std = self.forward(state.float().to(device))
std = log_std.exp()
normal = Normal(mean, std)
z = normal.sample()
action = torch.tanh(z).detach().cpu().numpy()
return action
# Use re-parameterization tick
def evaluate(self, state, epsilon=1e-6):
mean, log_std = self.forward(state)
std = log_std.exp()
normal = Normal(mean, std)
noise = Normal(0, 1)
z = noise.sample()
action = torch.tanh(mean + std * z.to(device))
log_prob = normal.log_prob(mean + std * z.to(device)) - torch.log(1 - action.pow(2) + epsilon)
return action, log_prob
class SAC:
def __init__(self, env, gamma, tau, buffer_maxlen, value_lr, q_lr, policy_lr):
self.env = env
self.state_dim = env.dim_state
self.action_dim = env.dim_action
# hyperparameters
self.gamma = gamma
self.tau = tau
# initialize networks
self.value_net = ValueNet(self.state_dim).to(device)
self.target_value_net = ValueNet(self.state_dim).to(device)
self.q1_net = SoftQNet(self.state_dim, self.action_dim).to(device)
self.q2_net = SoftQNet(self.state_dim, self.action_dim).to(device)
self.policy_net = PolicyNet(self.state_dim, self.action_dim).to(device)
# Load the target value network parameters
for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):
target_param.data.copy_(self.tau * param + (1 - self.tau) * target_param)
# Initialize the optimizer
self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=value_lr)
self.q1_optimizer = optim.Adam(self.q1_net.parameters(), lr=q_lr)
self.q2_optimizer = optim.Adam(self.q2_net.parameters(), lr=q_lr)
self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=policy_lr)
# Initialize thebuffer
self.buffer = ReplayBeffer(buffer_maxlen)
def get_action(self, state):
action = self.policy_net.action(state)
return action
def update(self, batch_size):
state, action, reward, next_state, done = self.buffer.sample(batch_size)
new_action, log_prob = self.policy_net.evaluate(state)
# V value loss
value = self.value_net(state)
new_q1_value = self.q1_net(state, new_action)
new_q2_value = self.q2_net(state, new_action)
next_value = torch.min(new_q1_value, new_q2_value) - log_prob
value_loss = F.mse_loss(value, next_value.detach())
# Soft q loss
q1_value = self.q1_net(state, action)
q2_value = self.q2_net(state, action)
target_value = self.target_value_net(next_state)
target_q_value = reward + done * self.gamma * target_value
q1_value_loss = F.mse_loss(q1_value, target_q_value.detach())
q2_value_loss = F.mse_loss(q2_value, target_q_value.detach())
# Policy loss
policy_loss = (log_prob - torch.min(new_q1_value, new_q2_value)).mean()
# Update Policy
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
# Update v
self.value_optimizer.zero_grad()
value_loss.backward()
self.value_optimizer.step()
# Update Soft q
self.q1_optimizer.zero_grad()
self.q2_optimizer.zero_grad()
q1_value_loss.backward()
q2_value_loss.backward()
self.q1_optimizer.step()
self.q2_optimizer.step()
# Update target networks
for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):
target_param.data.copy_(self.tau * param + (1 - self.tau) * target_param)
def main(env, agent, Episode, batch_size):
Return = []
action_range = [env.min_action, env.max_action]
for episode in range(Episode):
score = 0
state = env.reset()[0]
for i in range(200):
action = agent.get_action(state)
# action output range[-1,1],expand to allowable range
action_in = action * (action_range[1] - action_range[0]) / 2.0 + (action_range[1] + action_range[0]) / 2.0
next_state, reward, done, _ = env.step(action_in)
#return observation, reward, done, None
# next_state, reward, done, _ = env.step(action)
done_mask = 0.0 if done else 1.0
agent.buffer.push((state, action, reward, next_state, done_mask))
state = next_state[0]
score += reward
if done:
break
if agent.buffer.buffer_len() > 200:
agent.update(batch_size)
print("episode:{}, Return:{}, buffer_capacity:{}".format(episode, score, agent.buffer.buffer_len()))
Return.append(score)
score = 0
# env.close()
plt.plot(Return)
plt.ylabel('Return')
plt.xlabel("Episode")
plt.grid(True)
plt.show()
# torch.save(agent.policy_net, "policy-center.pkl")
if __name__ == '__main__':
env = Environment()
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
# Params
tau = 0.01
gamma = 0.99
q_lr = 3e-3
value_lr = 3e-3
policy_lr = 3e-3
buffer_maxlen = 50000
Episode = 1000
batch_size = 128
agent = SAC(env, gamma, tau, buffer_maxlen, value_lr, q_lr, policy_lr)
main(env, agent, Episode, batch_size)
这个代码运行的时候不能收敛,奖励函数不能稳定的变大有没有好的解决方法