I used a youtube tutorial to implement this. I refered to his code directly in his github and I looked through other people code that are similar as well. I am not sure why it does not work in the sense that the agent does not learn :(. Bellow is my agent class.
class Agent:
def __init__(self, input_dims, n_actions, gamma=0.99, alpha=0.00003, gae_lambda=0.95,
policy_clip=0.2, buffer_size=64, batch_size=10, num_batches = 5):
self.gamma = gamma
self.alpha = alpha
self.gae_lambda = gae_lambda
self.policy_clip = policy_clip
self.actor = ActorNetwork(input_dims, n_actions, alpha)
self.critic = CriticNetwork(input_dims, alpha)
self.buffer = Replay_Buffer(buffer_size = buffer_size, batch_size = batch_size, num_batches = num_batches)
def take_action(self, state):
state = T.FloatTensor([state]).to(self.actor.device)
action_distribution = self.actor(state)
value = self.critic(state)
action = action_distribution.sample()
log_probs = T.squeeze( action_distribution.log_prob(action) ).item()
action = T.squeeze(action).item()
value = T.squeeze(value).item()
#entropy = T.squeeze(action_distribution.entropy()).item()
return action, log_probs, value#, entropy
def update(self):
data = self.buffer.sample()
for sub_data in data:
state, action, reward, done, value, log_prob = sub_data
advantage = np.zeros(len(reward), dtype=np.float32)
for t in range(len(reward)):
discount = 1
a_t = 0
for k in range(len(reward) -1, t):
a_t += discount *( reward[k] + (1-done[k])*self.gamma*value[k+1] - value[k])
discount *= self.gamma * self.gae_lambda
advantage[t] += a_t
advantage = T.FloatTensor(advantage).to(self.actor.device)
state = T.FloatTensor(state).to(self.actor.device)
action = T.LongTensor(action).to(self.actor.device)
log_prob = T.FloatTensor(log_prob).to(self.actor.device)
value = T.FloatTensor(value).to(self.actor.device)
critic_value = self.critic(state)
critic_value = T.squeeze(critic_value)
dist = self.actor(state)
new_log_prob = dist.log_prob(action)
prob_ratio = new_log_prob.exp() / log_prob.exp()
weighted_adv = prob_ratio * advantage
weighted_clipped_adv = T.clamp(prob_ratio, 1-self.policy_clip, 1 + self.policy_clip) * advantage
actor_loss = -T.min(weighted_adv, weighted_clipped_adv).mean()
returns = advantage + value #AKA critic loss
critic_loss = ((returns - critic_value)**2).mean()
entropy = dist.entropy().mean()
total_loss = actor_loss + 0.5 * critic_loss + 0.1 * entropy
self.actor.optimizer.zero_grad()
self.critic.optimizer.zero_grad()
total_loss.backward()
self.actor.optimizer.step()
self.critic.optimizer.step()
self.buffer.buffer = []
Sorry for bluntly putting code. But I don’t know what other detail to add. Additionally what may help in the debugging process, is my version of the replay buffer. It cannot even solve cartpole, one of the simplest environments.
class Replay_Buffer:
def __init__(self, buffer_size, batch_size, num_batches, seed=0):
self.buffer = []
self.buffer_size = buffer_size
self.batch_size = batch_size
self.num_batches = num_batches
self.rng = np.random.RandomState(seed)
def append(self, state, action, reward, done, value, log_prob):
self.buffer.append([state, action, reward, done, value, log_prob])
if len(self.buffer) > self.buffer_size:
self.buffer.pop(0)
def sample(self):
batch_starts = self.rng.choice(len(self.buffer) - self.batch_size + 1, self.num_batches, replace=False)
batches = [self.buffer[i:i+self.batch_size] for i in batch_starts]
return [map(np.stack, zip(*batch)) for batch in batches]
def get_buffer_size(self):
return len(self.buffer)
I tried different learning parameters as well. but still not luck.
2