Im doing the reinforce algo to solve gym env, I try it for discrete action space and everything seems good. But when I tried it on continious action space my algo don’t learn. The thing that i dont understand that there is only one function (and one network) that differ for discrete and continious action space.
def choose_action_softmax(self,state):
probabilities = F.softmax(self.NN_actor(state),dim=0)
a = T.multinomial(probabilities, num_samples = 1,replacement=True).squeeze()
self.log_prob.append( T.log(probabilities[a]))
return a.detach().numpy()
def choose_action_normal(self,state):
distrib_parameters = self.NN_actor(state)
mu = distrib_parameters[::2]
sigma = F.softplus(distrib_parameters[1::2])+1e-6
normal=T.distributions.Normal(mu,sigma)
a = 2*T.tanh(normal.sample()) #the * 2 is only for my test because my action space is [-2,2]
log_prob = T.sum(normal.log_prob(a))
self.log_prob.append(log_prob)
return a.detach().numpy()
Top: discrete action space, Bottom: continious action space
I tried various learning rate without sucess.
I really think the problem come from my choose_action_normal because my class work for discrete action space.
Ty for reading 🙂
Here the full code of the full class if you want to look:
class PolicyGradient:
def __init__(self, env,lr=0.001):
self.env = env
self.gamma = 1
state_dims = np.prod(env.observation_space.shape)
# action space class verif
action_class = env.action_space
self.policy_used = self.check_action_space(action_class)
if self.policy_used == "Softmax":
self.NN_actor = Neural_Network(lr,input_dims=state_dims,output_dims = action_class.n)
else:
self.NN_actor = Neural_Network(lr, input_dims = state_dims,output_dims = action_class.shape[0] * 2 ) # *2 for each action we have mean and variance
self.ep_reward_history=[]
self.log_prob = []
return
def choose_action(self,state):
if self.policy_used == 'Softmax':
a = self.choose_action_softmax(state)
else:
a = self.choose_action_normal(state)
return a
def choose_action_softmax(self,state):
probabilities = F.softmax(self.NN_actor(state),dim=0)
a = T.multinomial(probabilities, num_samples = 1,replacement=True).squeeze()
self.log_prob.append( T.log(probabilities[a]))
return a.detach().numpy()
def choose_action_normal(self,state):
distrib_parameters = self.NN_actor(state)
mu = distrib_parameters[::2]
sigma = F.softplus(distrib_parameters[1::2])+1e-6
normal=T.distributions.Normal(mu,sigma)
a = 2*T.tanh(normal.sample()) #the * 2 is only for my test because my action space is [-2,2]
log_prob = T.sum(normal.log_prob(a))
self.log_prob.append(log_prob)
return a.detach().numpy()
def check_action_space(self,action_class):
if isinstance(action_class,gym.spaces.Discrete):
policy = "Softmax"
elif isinstance(action_class,gym.spaces.Box):
policy = "Normal"
else:
print("Unknow action_space")
sys.exit()
return policy
def discounted_reward(self,normalize = False):
reward_history= self.ep_reward_history
total_reward=[]
discounted_sum = 0
for reward in reward_history[::-1]:
discounted_sum = reward + self.gamma * discounted_sum
total_reward.insert(0,discounted_sum)
if normalize is True:
total_reward=np.array(total_reward)
total_reward=(total_reward-np.mean(total_reward))/(np.std(total_reward)+1E-7)
total_reward=list(total_reward)
return total_reward
def learn(self):
G = T.tensor( self.discounted_reward(normalize = True) )
log_prob = T.stack( self.log_prob).reshape(-1)
loss = T.mean(-log_prob * G)
#gradient step
self.NN_actor.optimiser.zero_grad()
loss.backward()
self.NN_actor.optimiser.step()
self.log_prob=[]
self.ep_reward_history=[]
def train(self,nb_episode,score_wanted = 2**32):
#list to track reward and to do a moving average of it
reward_history=[]
moving_average=[]
for ep in tqdm(range(nb_episode)):
observation=self.env.reset()[0]
done=False
score=0
while not done:
action= self.choose_action(observation)
observation_, reward, terminated, truncated, info = self.env.step(action)
done = terminated or truncated
score += reward
self.ep_reward_history.append(reward)
observation = observation_
self.learn()
reward_history.append(score)
#REWARD PLOT
window_average=1000
if( len(reward_history) >= window_average ):
moving_average.append( np.mean(reward_history[-window_average:] ))
if moving_average[-1] >= score_wanted:
print(moving_average[-1])
print("finish")
break
if ((ep+1) % (nb_episode//20) ) == 0 :
clear_output(wait=True)
x=np.arange(window_average,len(moving_average)+window_average)
plt.plot(x,moving_average)
plt.show()
#finish
return
Cauchy_Chlasse is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.