Hi, I am trying to create an environment that is a variation of Cartpole.
From the Cartpole definiton:
Suppose you can apply a force F but also a multiplier of this force M, so the total force applied is F * M.
#PPO-LSTM
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import time
import math
import numpy as np
import gym.envs.classic_control
#Hyperparameters
learning_rate = 0.0005
gamma = 0.98
lmbda = 0.95
eps_clip = 0.1
K_epoch = 2
T_horizon = 20
class CustomCartpole(gym.envs.classic_control.CartPoleEnv):
"""Add a dimension to the cartpole action space that is used as 'speed' button."""
def __init__(self, env_config):
super().__init__()
self.force_mag = 5.0
self.action_space = gym.spaces.MultiDiscrete([2, 4])
def step(self, action):
err_msg = "%r (%s) invalid" % (action, type(action))
assert self.action_space.contains(action), err_msg
x, x_dot, theta, theta_dot = self.state
force = self.force_mag if action[0] == 1 else -self.force_mag
force *= (action[1] + 1)
costheta = math.cos(theta)
sintheta = math.sin(theta)
temp = (force + self.polemass_length * theta_dot ** 2 * sintheta) / self.total_mass
thetaacc = (self.gravity * sintheta - costheta * temp) / (self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass))
xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
if self.kinematics_integrator == 'euler':
x = x + self.tau * x_dot
x_dot = x_dot + self.tau * xacc
theta = theta + self.tau * theta_dot
theta_dot = theta_dot + self.tau * thetaacc
else: # semi-implicit euler
x_dot = x_dot + self.tau * xacc
x = x + self.tau * x_dot
theta_dot = theta_dot + self.tau * thetaacc
theta = theta + self.tau * theta_dot
self.state = (x, x_dot, theta, theta_dot)
done = bool(
x < -self.x_threshold
or x > self.x_threshold
or theta < -self.theta_threshold_radians
or theta > self.theta_threshold_radians
)
if not done:
reward = 1.0
elif self.steps_beyond_done is None:
# Pole just fell!
self.steps_beyond_done = 0
reward = 1.0
else:
if self.steps_beyond_done == 0:
logger.warn(
"You are calling 'step()' even though this "
"environment has already returned done = True. You "
"should always call 'reset()' once you receive 'done = "
"True' -- any further steps are undefined behavior."
)
self.steps_beyond_done += 1
reward = 0.0
return np.array(self.state), reward, done, {}
class PPO(nn.Module):
def __init__(self):
super(PPO, self).__init__()
self.data = []
self.fc1 = nn.Linear(4,64)
self.lstm = nn.LSTM(64,32)
self.fc_pi = nn.Linear(32,2)
self.fc_v = nn.Linear(32,2)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def pi(self, x, hidden):
x = F.relu(self.fc1(x))
x = x.view(-1, 1, 64)
x, lstm_hidden = self.lstm(x, hidden)
x = self.fc_pi(x)
prob = F.softmax(x, dim=2)
return prob, lstm_hidden
def v(self, x, hidden):
x = F.relu(self.fc1(x))
x = x.view(-1, 1, 64)
x, lstm_hidden = self.lstm(x, hidden)
v = self.fc_v(x)
return v
def put_data(self, transition):
self.data.append(transition)
def make_batch(self):
s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, h_in_lst, h_out_lst, done_lst = [], [], [], [], [], [], [], []
for transition in self.data:
s, a, r, s_prime, prob_a, h_in, h_out, done = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
prob_a_lst.append([prob_a])
h_in_lst.append(h_in)
h_out_lst.append(h_out)
done_mask = 0 if done else 1
done_lst.append([done_mask])
s,a,r,s_prime,done_mask,prob_a = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
torch.tensor(done_lst, dtype=torch.float), torch.tensor(prob_a_lst)
self.data = []
return s,a,r,s_prime, done_mask, prob_a, h_in_lst[0], h_out_lst[0]
def train_net(self):
s,a,r,s_prime,done_mask, prob_a, (h1_in, h2_in), (h1_out, h2_out) = self.make_batch()
first_hidden = (h1_in.detach(), h2_in.detach())
second_hidden = (h1_out.detach(), h2_out.detach())
for i in range(K_epoch):
v_prime = self.v(s_prime, second_hidden).squeeze(1)
td_target = r + gamma * v_prime * done_mask
v_s = self.v(s, first_hidden).squeeze(1)
delta = td_target - v_s
delta = delta.detach().numpy()
advantage_lst = []
advantage = 0.0
for item in delta[::-1]:
advantage = gamma * lmbda * advantage + item[0]
advantage_lst.append([advantage])
advantage_lst.reverse()
advantage = torch.tensor(advantage_lst, dtype=torch.float)
pi, _ = self.pi(s, first_hidden)
pi_a = pi.squeeze(1).gather(1,a)
ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a)) # a/b == log(exp(a)-exp(b))
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(v_s, td_target.detach())
self.optimizer.zero_grad()
loss.mean().backward(retain_graph=True)
self.optimizer.step()
def main():
#env = gym.make('CartPole-v1')
env = CustomCartpole({'override_actions': False})
model = PPO()
score = 0.0
print_interval = 20
for n_epi in range(10000):
h_out = (torch.zeros([1, 1, 32], dtype=torch.float), torch.zeros([1, 1, 32], dtype=torch.float))
s = env.reset()
done = False
while not done:
for t in range(T_horizon):
h_in = h_out
prob, h_out = model.pi(torch.from_numpy(s).float(), h_in)
prob = prob.view(-1)
m = Categorical(prob)
a = m.sample().item()
s_prime, r, done, info = env.step(a)
model.put_data((s, a, r/100.0, s_prime, prob[a].item(), h_in, h_out, done))
s = s_prime
score += r
if done:
break
model.train_net()
if n_epi%print_interval==0 and n_epi!=0:
print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
score = 0.0
env.close()
if __name__ == '__main__':
main()
while not done:
for t in range(T_horizon):
h_in = h_out
prob, h_out = model.pi(torch.from_numpy(s).float(), h_in)
prob = prob.view(-1)
m = Categorical(prob)
a = m.sample().item()
s_prime, r, done, info = env.step(a)
model.put_data((s, a, r/100.0, s_prime, prob[a].item(), h_in, h_out, done))
s = s_prime
score += r
if done:
break
model.train_net()