I have a task where I need to implement an R_max algorithm with modified policy iteration over the frozen lake problem.
I first tried to just implement a regular modified policy iteration and it worked but when I tried to adjust it to r max requirements (using fake model with a fake state) it doesn’t converge to the right policy.
Unfortunately I am also not sure where exactly the problem is.
I would really appreciate if you could have a look over my code and maybe spot a few mistakes.
import numpy as np
class Rmax:
def __init__(self, env, r_max=2.5, gamma=0.95, k=20, num_iterations=5, epsilon=1e-6, iterations=30000):
# init model basic data structure
self.env = env
self.num_states = env.observation_space.n + 1
self.num_actions = env.action_space.n
self.r_max = r_max
self.gamma = gamma
self.k = k
self.num_iterations = num_iterations
self.epsilon = epsilon
self.iterations = iterations
#init fake Tr table
self.Tr = np.ones((self.num_states, self.num_actions, self.num_states))
for state in range(self.num_states-1):
for action in range(self.num_actions):
for next_state in range(self.num_states - 1):
self.Tr[state][action][next_state] = 1/(self.num_states-1) # we assume transition to all real states in uniform distribution
self.rewards = np.zeros((self.num_states, self.num_actions))
for action in range(self.num_actions):
self.rewards[self.num_states-1][action] = self.r_max # reward for fake state is 1 all other rewards are 0
#counters
self.state_action_counter = np.zeros((self.num_states, self.num_actions), dtype=int)
self.state_action_state_counter = np.zeros((self.num_states, self.num_actions, self.num_states), dtype=int)
#known table
self.known = np.zeros((self.num_states, self.num_actions), dtype=bool)
self.policy = np.zeros(self.num_states, dtype=int)
#and 0 for all values
self.values = np.ones(self.num_states)
def train(self):
# Training loop is Solve model discover new states and solve again
for i in range(self.iterations):
print(f'iteration: {i}/{self.iterations}')
self.solve_model()
def solve_model(self):
# solving the model with MPI requires to steps policy evaluation and policy improvement
self.policy_eval()
stable = self.policy_improvement()
# now we run policy till we know
changed = False
done, truncated = False, False
while not changed and not done and not truncated:
state = self.env.reset()[0]
action = self.policy[state]
next_state, reward, done, truncated, info = self.env.step(action)
self.state_action_counter[state][action] += 1
self.state_action_state_counter[state][action][next_state] += 1
if self.state_action_counter[state][action] >= self.k:
self.known[state][action] = True
changed = True
self.update_model()
def update_model(self):
for state in range(self.num_states):
for action in range(self.num_actions):
if self.known[state][action]:
for next_state in range(self.num_states):
if self.state_action_state_counter[state][action][next_state] != 0:
self.Tr[state][action][next_state] = (
self.state_action_state_counter[state][action][next_state] / self.state_action_counter[state][action])
else:
self.Tr[state][action] = np.ones(self.num_states) / self.num_states
def policy_eval(self):
# in MPI we do a limited number of policy evaluation
for _ in range(self.num_iterations):
new_values = self.values.copy()
for state in range(self.num_states):
action = self.policy[state]
new_value = 0
for next_state in range(self.num_states):
prob = self.Tr[state][action][next_state]
reward = self.rewards[state][action]
new_value += prob *(reward + self.gamma * self.values[next_state])
new_values[state] = new_value
#also if policy converged we can stop
if np.max(np.abs(new_values - self.values)) < self.epsilon:
self.values = new_values
break
self.values = new_values
def policy_improvement(self):
stable = True
for state in range(self.num_states):
old_action = self.policy[state]
action_values = np.zeros( self.num_actions)
for action in range(self.num_actions):
for next_state in range(self.num_states):
prob = self.Tr[state][action][next_state]
reward = self.rewards[state][action]
action_values[action] += prob * (reward + self.gamma * self.values[next_state])
best_action = np.argmax(action_values)
self.policy[state] = best_action
if old_action != best_action:
stable = False
return stable
I tried to play with the values go over the algorithm again but unfortunately didn’t identify the problem.
Daniel Dubinskey is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.