Implementation guidance for Mobile Robot Path Planning in Dynamic Environments Through Globally Guided Reinforcement Learning

I have been working with the implementation of a mobile robot control paper, which guides an agent in a dynamic environment with unknown trajectories. I mostly worked by the paper’s explanations: paper, but my model doesn’t seem to converge. I trained both a DQN and a PPO agent for around 100 thousand episodes, and debugged potential issues for quite a while now but nothing yielded “good-like” results. I’m a second year BSc student and would say I’m only exploring reinforcement learning concepts, so any advice would be well appreciated. I’m generally asking about implementation structure and initializations, do I make a rookie mistake in the base code or did I misunderstand something? I’m generally interested in my maskPPO agent’s and my CNN model’s correctness, are they implemented correctly..? My git repo is reachable here with the full project: repo

If anyone takes the time to have look, it would be much appreciated.

maskPPO agent:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>class MaskPPOAgent:
def __init__(self, env, model, device='cpu', batch_size=32, mini_batch_size=8, epochs=4, gamma=0.99, gae_lambda=0.95, clip_epsilon=0.2, c1=0.5, c2=0.01, lr=3e-4):
self.env = env
self.model = model
self.device = device
self.batch_size = batch_size
self.mini_batch_size = mini_batch_size
self.epochs = epochs
self.gamma = gamma
self.gae_lambda = gae_lambda
self.clip_epsilon = clip_epsilon
self.c1 = c1 # Value function coefficient
self.c2 = c2 # Entropy coefficient
self.optimizer = optim.Adam(self.model.parameters(), lr=lr, eps=1e-5)
self.replay_buffer = PrioritizedReplayBuffer(capacity=10000)
# Logging
self.policy_losses = []
self.value_losses = []
self.entropies = []
self.kl_divs = []
self.debug = 1
def select_action(self, state):
if self.debug:
print("Debug: Selecting action")
print(f"Debug: Input state shape: {state.shape}")
state = torch.from_numpy(state).float().to(self.device)
with torch.no_grad():
action_logits, state_value = self.model(state, return_value=True)
action_probs = torch.softmax(action_logits, dim=-1)
mask = self.env.get_action_mask(self.device)
if self.debug:
print(f"Debug: Action probabilities before masking: {action_probs}")
print(f"Debug: Action mask: {mask}")
action_probs = action_probs * mask
if self.debug:
print(f"Debug: Action probabilities after masking: {action_probs}")
if action_probs.sum() > 0:
action_probs = action_probs / action_probs.sum(dim=-1, keepdim=True)
if self.debug:
print(f"Debug: Action probabilities after probability sum: {action_probs}")
action_distribution = torch.distributions.Categorical(action_probs)
if self.debug:
print(f"Debug: Action distribution: {action_distribution}")
action = action_distribution.sample()
log_prob = action_distribution.log_prob(action)
else:
action = torch.tensor(4, device=self.device)
log_prob = torch.tensor(0.0, device=self.device)
if self.debug:
print(f"Debug: Action selected: {action.item()}")
print(f"Debug: Log probability: {log_prob.item()}")
print(f"Debug: State value: {state_value.item()}")
return action.item(), log_prob.item(), state_value.item()
def store(self, state, action, reward, next_state, done, log_prob, value):
experience = (state, action, reward, next_state, done, log_prob, value)
self.replay_buffer.add(experience)
def update(self, states, actions, rewards, next_states, dones, log_probs, values):
if self.debug:
print("Debug: Starting update")
advantages, returns = self.compute_advantages_and_returns(rewards, values, dones)
if self.debug:
print(f"Debug: Advantages: {advantages}, returns: {returns}")
states = torch.from_numpy(np.array(states)).float().to(self.device)
actions = torch.from_numpy(np.array(actions)).long().to(self.device)
old_log_probs = torch.from_numpy(np.array(log_probs)).float().to(self.device)
advantages = torch.from_numpy(advantages).float().to(self.device)
returns = torch.from_numpy(returns).float().to(self.device)
old_values = torch.from_numpy(np.array(values)).float().to(self.device)
states = states.squeeze(1)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
if self.debug:
print(f"Debug: Normalized advantages mean: {advantages.mean()}, std: {advantages.std()}")
action_logits, state_values = self.model(states, return_value=True)
new_log_probs = torch.distributions.Categorical(torch.softmax(action_logits, dim=-1)).log_prob(actions)
entropy = torch.distributions.Categorical(torch.softmax(action_logits, dim=-1)).entropy().mean()
ratio = torch.exp(new_log_probs - old_log_probs)
surrogate1 = ratio * advantages
surrogate2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
policy_loss = -torch.min(surrogate1, surrogate2).mean()
values_clipped = old_values + (state_values - old_values).clamp(-self.clip_epsilon, self.clip_epsilon)
value_loss1 = (state_values - returns).pow(2)
value_loss2 = (values_clipped - returns).pow(2)
value_loss = 0.5 * torch.max(value_loss1, value_loss2).mean()
loss = policy_loss + self.c1 * value_loss - self.c2 * entropy
if self.debug:
print(f"Debug: Policy loss: {policy_loss.item()}")
print(f"Debug: Value loss: {value_loss.item()}")
print(f"Debug: Entropy: {entropy.item()}")
print(f"Debug: Total loss: {loss.item()}")
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)
self.optimizer.step()
# Logging
self.policy_losses.append(policy_loss.item())
self.value_losses.append(value_loss.item())
self.entropies.append(entropy.item())
approx_kl_div = ((old_log_probs - new_log_probs) ** 2).mean().item()
self.kl_divs.append(approx_kl_div)
if self.debug:
print(f"Debug: Approximate KL divergence: {approx_kl_div}")
print("Debug: Update completed")
def compute_advantages_and_returns(self, rewards, values, dones):
advantages = np.zeros_like(rewards)
returns = np.zeros_like(rewards)
last_gae_lam = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_non_terminal = 1.0 - dones[t]
next_value = 0
else:
next_non_terminal = 1.0 - dones[t + 1]
next_value = values[t + 1]
delta = rewards[t] + self.gamma * next_value * next_non_terminal - values[t]
advantages[t] = last_gae_lam = delta + self.gamma * self.gae_lambda * next_non_terminal * last_gae_lam
# Handle the last timestep separately
advantages[-1] = rewards[-1] - values[-1]
returns = advantages + values
return advantages, returns
def replay_buffer_update(self):
if len(self.replay_buffer) < self.batch_size:
return
states, actions, rewards, next_states, dones, log_probs, values, _, _ = self.replay_buffer.sample(self.batch_size)
self.update(states, actions, rewards, next_states, dones, log_probs, values)
def adjust_learning_rate(self, step, total_steps):
lr = 3e-4 * (1 - step / total_steps)
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr
def get_logs(self):
logs = {
'policy_loss': np.mean(self.policy_losses),
'value_loss': np.mean(self.value_losses),
'entropy': np.mean(self.entropies),
'approx_kl_div': np.mean(self.kl_divs)
}
self.policy_losses = []
self.value_losses = []
self.entropies = []
self.kl_divs = []
return logs
</code>
<code>class MaskPPOAgent: def __init__(self, env, model, device='cpu', batch_size=32, mini_batch_size=8, epochs=4, gamma=0.99, gae_lambda=0.95, clip_epsilon=0.2, c1=0.5, c2=0.01, lr=3e-4): self.env = env self.model = model self.device = device self.batch_size = batch_size self.mini_batch_size = mini_batch_size self.epochs = epochs self.gamma = gamma self.gae_lambda = gae_lambda self.clip_epsilon = clip_epsilon self.c1 = c1 # Value function coefficient self.c2 = c2 # Entropy coefficient self.optimizer = optim.Adam(self.model.parameters(), lr=lr, eps=1e-5) self.replay_buffer = PrioritizedReplayBuffer(capacity=10000) # Logging self.policy_losses = [] self.value_losses = [] self.entropies = [] self.kl_divs = [] self.debug = 1 def select_action(self, state): if self.debug: print("Debug: Selecting action") print(f"Debug: Input state shape: {state.shape}") state = torch.from_numpy(state).float().to(self.device) with torch.no_grad(): action_logits, state_value = self.model(state, return_value=True) action_probs = torch.softmax(action_logits, dim=-1) mask = self.env.get_action_mask(self.device) if self.debug: print(f"Debug: Action probabilities before masking: {action_probs}") print(f"Debug: Action mask: {mask}") action_probs = action_probs * mask if self.debug: print(f"Debug: Action probabilities after masking: {action_probs}") if action_probs.sum() > 0: action_probs = action_probs / action_probs.sum(dim=-1, keepdim=True) if self.debug: print(f"Debug: Action probabilities after probability sum: {action_probs}") action_distribution = torch.distributions.Categorical(action_probs) if self.debug: print(f"Debug: Action distribution: {action_distribution}") action = action_distribution.sample() log_prob = action_distribution.log_prob(action) else: action = torch.tensor(4, device=self.device) log_prob = torch.tensor(0.0, device=self.device) if self.debug: print(f"Debug: Action selected: {action.item()}") print(f"Debug: Log probability: {log_prob.item()}") print(f"Debug: State value: {state_value.item()}") return action.item(), log_prob.item(), state_value.item() def store(self, state, action, reward, next_state, done, log_prob, value): experience = (state, action, reward, next_state, done, log_prob, value) self.replay_buffer.add(experience) def update(self, states, actions, rewards, next_states, dones, log_probs, values): if self.debug: print("Debug: Starting update") advantages, returns = self.compute_advantages_and_returns(rewards, values, dones) if self.debug: print(f"Debug: Advantages: {advantages}, returns: {returns}") states = torch.from_numpy(np.array(states)).float().to(self.device) actions = torch.from_numpy(np.array(actions)).long().to(self.device) old_log_probs = torch.from_numpy(np.array(log_probs)).float().to(self.device) advantages = torch.from_numpy(advantages).float().to(self.device) returns = torch.from_numpy(returns).float().to(self.device) old_values = torch.from_numpy(np.array(values)).float().to(self.device) states = states.squeeze(1) advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) if self.debug: print(f"Debug: Normalized advantages mean: {advantages.mean()}, std: {advantages.std()}") action_logits, state_values = self.model(states, return_value=True) new_log_probs = torch.distributions.Categorical(torch.softmax(action_logits, dim=-1)).log_prob(actions) entropy = torch.distributions.Categorical(torch.softmax(action_logits, dim=-1)).entropy().mean() ratio = torch.exp(new_log_probs - old_log_probs) surrogate1 = ratio * advantages surrogate2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages policy_loss = -torch.min(surrogate1, surrogate2).mean() values_clipped = old_values + (state_values - old_values).clamp(-self.clip_epsilon, self.clip_epsilon) value_loss1 = (state_values - returns).pow(2) value_loss2 = (values_clipped - returns).pow(2) value_loss = 0.5 * torch.max(value_loss1, value_loss2).mean() loss = policy_loss + self.c1 * value_loss - self.c2 * entropy if self.debug: print(f"Debug: Policy loss: {policy_loss.item()}") print(f"Debug: Value loss: {value_loss.item()}") print(f"Debug: Entropy: {entropy.item()}") print(f"Debug: Total loss: {loss.item()}") self.optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5) self.optimizer.step() # Logging self.policy_losses.append(policy_loss.item()) self.value_losses.append(value_loss.item()) self.entropies.append(entropy.item()) approx_kl_div = ((old_log_probs - new_log_probs) ** 2).mean().item() self.kl_divs.append(approx_kl_div) if self.debug: print(f"Debug: Approximate KL divergence: {approx_kl_div}") print("Debug: Update completed") def compute_advantages_and_returns(self, rewards, values, dones): advantages = np.zeros_like(rewards) returns = np.zeros_like(rewards) last_gae_lam = 0 for t in reversed(range(len(rewards))): if t == len(rewards) - 1: next_non_terminal = 1.0 - dones[t] next_value = 0 else: next_non_terminal = 1.0 - dones[t + 1] next_value = values[t + 1] delta = rewards[t] + self.gamma * next_value * next_non_terminal - values[t] advantages[t] = last_gae_lam = delta + self.gamma * self.gae_lambda * next_non_terminal * last_gae_lam # Handle the last timestep separately advantages[-1] = rewards[-1] - values[-1] returns = advantages + values return advantages, returns def replay_buffer_update(self): if len(self.replay_buffer) < self.batch_size: return states, actions, rewards, next_states, dones, log_probs, values, _, _ = self.replay_buffer.sample(self.batch_size) self.update(states, actions, rewards, next_states, dones, log_probs, values) def adjust_learning_rate(self, step, total_steps): lr = 3e-4 * (1 - step / total_steps) for param_group in self.optimizer.param_groups: param_group['lr'] = lr def get_logs(self): logs = { 'policy_loss': np.mean(self.policy_losses), 'value_loss': np.mean(self.value_losses), 'entropy': np.mean(self.entropies), 'approx_kl_div': np.mean(self.kl_divs) } self.policy_losses = [] self.value_losses = [] self.entropies = [] self.kl_divs = [] return logs </code>
class MaskPPOAgent:
    def __init__(self, env, model, device='cpu', batch_size=32, mini_batch_size=8, epochs=4, gamma=0.99, gae_lambda=0.95, clip_epsilon=0.2, c1=0.5, c2=0.01, lr=3e-4):
        self.env = env
        self.model = model
        self.device = device
        self.batch_size = batch_size
        self.mini_batch_size = mini_batch_size
        self.epochs = epochs
        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.clip_epsilon = clip_epsilon
        self.c1 = c1  # Value function coefficient
        self.c2 = c2  # Entropy coefficient
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr, eps=1e-5)
        self.replay_buffer = PrioritizedReplayBuffer(capacity=10000)
        
        # Logging
        self.policy_losses = []
        self.value_losses = []
        self.entropies = []
        self.kl_divs = []

        self.debug = 1

    def select_action(self, state):
        if self.debug:
            print("Debug: Selecting action")
            print(f"Debug: Input state shape: {state.shape}")
        
        state = torch.from_numpy(state).float().to(self.device)
        with torch.no_grad():
            action_logits, state_value = self.model(state, return_value=True)
            action_probs = torch.softmax(action_logits, dim=-1)

            mask = self.env.get_action_mask(self.device)
            if self.debug:
                print(f"Debug: Action probabilities before masking: {action_probs}")
                print(f"Debug: Action mask: {mask}")
            
            action_probs = action_probs * mask
            
            if self.debug:
                print(f"Debug: Action probabilities after masking: {action_probs}")

            if action_probs.sum() > 0:
                action_probs = action_probs / action_probs.sum(dim=-1, keepdim=True)
                if self.debug:
                    print(f"Debug: Action probabilities after probability sum: {action_probs}")
                action_distribution = torch.distributions.Categorical(action_probs)
                if self.debug:
                    print(f"Debug: Action distribution: {action_distribution}")
                action = action_distribution.sample()
                log_prob = action_distribution.log_prob(action)
            else:
                action = torch.tensor(4, device=self.device)
                log_prob = torch.tensor(0.0, device=self.device)

            if self.debug:
                print(f"Debug: Action selected: {action.item()}")
                print(f"Debug: Log probability: {log_prob.item()}")
                print(f"Debug: State value: {state_value.item()}")

        return action.item(), log_prob.item(), state_value.item()

    def store(self, state, action, reward, next_state, done, log_prob, value):
        experience = (state, action, reward, next_state, done, log_prob, value)
        self.replay_buffer.add(experience)

    def update(self, states, actions, rewards, next_states, dones, log_probs, values):
        if self.debug:
            print("Debug: Starting update")

        advantages, returns = self.compute_advantages_and_returns(rewards, values, dones)
        if self.debug:
            print(f"Debug: Advantages: {advantages}, returns: {returns}")

        states = torch.from_numpy(np.array(states)).float().to(self.device)
        actions = torch.from_numpy(np.array(actions)).long().to(self.device)
        old_log_probs = torch.from_numpy(np.array(log_probs)).float().to(self.device)
        advantages = torch.from_numpy(advantages).float().to(self.device)
        returns = torch.from_numpy(returns).float().to(self.device)
        old_values = torch.from_numpy(np.array(values)).float().to(self.device)

        states = states.squeeze(1)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        if self.debug:
            print(f"Debug: Normalized advantages mean: {advantages.mean()}, std: {advantages.std()}")

        action_logits, state_values = self.model(states, return_value=True)
        new_log_probs = torch.distributions.Categorical(torch.softmax(action_logits, dim=-1)).log_prob(actions)
        entropy = torch.distributions.Categorical(torch.softmax(action_logits, dim=-1)).entropy().mean()

        ratio = torch.exp(new_log_probs - old_log_probs)
        surrogate1 = ratio * advantages
        surrogate2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
        policy_loss = -torch.min(surrogate1, surrogate2).mean()

        values_clipped = old_values + (state_values - old_values).clamp(-self.clip_epsilon, self.clip_epsilon)
        value_loss1 = (state_values - returns).pow(2)
        value_loss2 = (values_clipped - returns).pow(2)
        value_loss = 0.5 * torch.max(value_loss1, value_loss2).mean()

        loss = policy_loss + self.c1 * value_loss - self.c2 * entropy

        if self.debug:
            print(f"Debug: Policy loss: {policy_loss.item()}")
            print(f"Debug: Value loss: {value_loss.item()}")
            print(f"Debug: Entropy: {entropy.item()}")
            print(f"Debug: Total loss: {loss.item()}")

        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)
        self.optimizer.step()

        # Logging
        self.policy_losses.append(policy_loss.item())
        self.value_losses.append(value_loss.item())
        self.entropies.append(entropy.item())
        approx_kl_div = ((old_log_probs - new_log_probs) ** 2).mean().item()
        self.kl_divs.append(approx_kl_div)

        if self.debug:
            print(f"Debug: Approximate KL divergence: {approx_kl_div}")
            print("Debug: Update completed")

    def compute_advantages_and_returns(self, rewards, values, dones):
        advantages = np.zeros_like(rewards)
        returns = np.zeros_like(rewards)
        last_gae_lam = 0
        
        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_non_terminal = 1.0 - dones[t]
                next_value = 0
            else:
                next_non_terminal = 1.0 - dones[t + 1]
                next_value = values[t + 1]
            
            delta = rewards[t] + self.gamma * next_value * next_non_terminal - values[t]
            advantages[t] = last_gae_lam = delta + self.gamma * self.gae_lambda * next_non_terminal * last_gae_lam

        # Handle the last timestep separately
        advantages[-1] = rewards[-1] - values[-1]

        returns = advantages + values
        return advantages, returns

    def replay_buffer_update(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        states, actions, rewards, next_states, dones, log_probs, values, _, _ = self.replay_buffer.sample(self.batch_size)
        self.update(states, actions, rewards, next_states, dones, log_probs, values)

    def adjust_learning_rate(self, step, total_steps):
        lr = 3e-4 * (1 - step / total_steps)
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = lr

    def get_logs(self):
        logs = {
            'policy_loss': np.mean(self.policy_losses),
            'value_loss': np.mean(self.value_losses),
            'entropy': np.mean(self.entropies),
            'approx_kl_div': np.mean(self.kl_divs)
        }
        self.policy_losses = []
        self.value_losses = []
        self.entropies = []
        self.kl_divs = []
        return logs

CNN model class:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>class CNNLSTMModel(nn.Module):
def __init__(self, height=15, width=15, nt=4, nc=3, dropout_rate=0.2):
super(CNNLSTMModel, self).__init__()
self.nc = nc
self.conv_blocks = nn.ModuleList()
in_channels = nt
out_channels = 32
for _ in range(nc):
self.conv_blocks.append(nn.Sequential(
nn.Conv3d(in_channels, out_channels, kernel_size=(1, 3, 3), stride=(1, 1, 1), padding=(0, 1, 1)),
nn.BatchNorm3d(out_channels),
nn.ReLU(),
nn.Dropout3d(dropout_rate),
nn.Conv3d(out_channels, out_channels, kernel_size=(1, 3, 3), stride=(1, 2, 2), padding=(0, 1, 1)),
nn.BatchNorm3d(out_channels),
nn.ReLU(),
nn.Dropout3d(dropout_rate)
))
in_channels = out_channels
out_channels *= 2
self.lstm_input_size = 128 * 2 * 2 * 4
self.lstm = nn.LSTM(input_size=self.lstm_input_size, hidden_size=512, batch_first=True)
self.fc1 = nn.Linear(512, 512)
self.fc2 = nn.Linear(512, 5)
self.value_head = nn.Linear(512, 1)
self.dropout = nn.Dropout(dropout_rate)
self.apply(self.orthogonal_init)
def forward(self, x, return_value=False):
# print(f"Input shape before processing: {x.shape}")
batch_size, nt, height, width, channels = x.shape
x = x.permute(0, 4, 1, 2, 3).contiguous() # Change to (batch_size, nt, channels, height, width)
# print(f"Input shape after permute: {x.shape}")
for conv_block in self.conv_blocks:
x = conv_block(x)
# print(f"Shape after conv block: {x.shape}")
# Flatten the spatial dimensions and combine with the time dimension
_, c, t, h, w = x.shape
x = x.permute(0, 2, 1, 3, 4).contiguous()
x = x.view(batch_size, t, -1)
# print(f"Shape after flatten: {x.shape}")
lstm_out, _ = self.lstm(x)
lstm_out = lstm_out[:, -1, :] # Take the last output
# print(f"Shape after lstm block: {x.shape}")
x = self.fc1(lstm_out)
x = torch.relu(x)
x = self.dropout(x)
# print(f"Shape after first linear + relu: {x.shape}")
action_logits = self.fc2(x)
# print(f"Shape after second linear + relu: {x.shape}")
if return_value:
value = self.value_head(x).squeeze(-1)
return action_logits, value
return action_logits
def orthogonal_init(self, module):
if isinstance(module, (nn.Conv3d, nn.Linear)):
nn.init.orthogonal_(module.weight, gain=np.sqrt(2))
if module.bias is not None:
nn.init.constant_(module.bias, 0)
elif isinstance(module, nn.LSTM):
for name, param in module.named_parameters():
if 'weight' in name:
nn.init.orthogonal_(param, gain=np.sqrt(2))
elif 'bias' in name:
nn.init.constant_(param, 0)
elif isinstance(module, nn.BatchNorm3d):
nn.init.constant_(module.weight, 1)
nn.init.constant_(module.bias, 0)
# Recursively initialize submodules
for child in module.children():
self.orthogonal_init(child)
</code>
<code>class CNNLSTMModel(nn.Module): def __init__(self, height=15, width=15, nt=4, nc=3, dropout_rate=0.2): super(CNNLSTMModel, self).__init__() self.nc = nc self.conv_blocks = nn.ModuleList() in_channels = nt out_channels = 32 for _ in range(nc): self.conv_blocks.append(nn.Sequential( nn.Conv3d(in_channels, out_channels, kernel_size=(1, 3, 3), stride=(1, 1, 1), padding=(0, 1, 1)), nn.BatchNorm3d(out_channels), nn.ReLU(), nn.Dropout3d(dropout_rate), nn.Conv3d(out_channels, out_channels, kernel_size=(1, 3, 3), stride=(1, 2, 2), padding=(0, 1, 1)), nn.BatchNorm3d(out_channels), nn.ReLU(), nn.Dropout3d(dropout_rate) )) in_channels = out_channels out_channels *= 2 self.lstm_input_size = 128 * 2 * 2 * 4 self.lstm = nn.LSTM(input_size=self.lstm_input_size, hidden_size=512, batch_first=True) self.fc1 = nn.Linear(512, 512) self.fc2 = nn.Linear(512, 5) self.value_head = nn.Linear(512, 1) self.dropout = nn.Dropout(dropout_rate) self.apply(self.orthogonal_init) def forward(self, x, return_value=False): # print(f"Input shape before processing: {x.shape}") batch_size, nt, height, width, channels = x.shape x = x.permute(0, 4, 1, 2, 3).contiguous() # Change to (batch_size, nt, channels, height, width) # print(f"Input shape after permute: {x.shape}") for conv_block in self.conv_blocks: x = conv_block(x) # print(f"Shape after conv block: {x.shape}") # Flatten the spatial dimensions and combine with the time dimension _, c, t, h, w = x.shape x = x.permute(0, 2, 1, 3, 4).contiguous() x = x.view(batch_size, t, -1) # print(f"Shape after flatten: {x.shape}") lstm_out, _ = self.lstm(x) lstm_out = lstm_out[:, -1, :] # Take the last output # print(f"Shape after lstm block: {x.shape}") x = self.fc1(lstm_out) x = torch.relu(x) x = self.dropout(x) # print(f"Shape after first linear + relu: {x.shape}") action_logits = self.fc2(x) # print(f"Shape after second linear + relu: {x.shape}") if return_value: value = self.value_head(x).squeeze(-1) return action_logits, value return action_logits def orthogonal_init(self, module): if isinstance(module, (nn.Conv3d, nn.Linear)): nn.init.orthogonal_(module.weight, gain=np.sqrt(2)) if module.bias is not None: nn.init.constant_(module.bias, 0) elif isinstance(module, nn.LSTM): for name, param in module.named_parameters(): if 'weight' in name: nn.init.orthogonal_(param, gain=np.sqrt(2)) elif 'bias' in name: nn.init.constant_(param, 0) elif isinstance(module, nn.BatchNorm3d): nn.init.constant_(module.weight, 1) nn.init.constant_(module.bias, 0) # Recursively initialize submodules for child in module.children(): self.orthogonal_init(child) </code>
class CNNLSTMModel(nn.Module):
    def __init__(self, height=15, width=15, nt=4, nc=3, dropout_rate=0.2):
        super(CNNLSTMModel, self).__init__()
        self.nc = nc
        self.conv_blocks = nn.ModuleList()
        
        in_channels = nt
        out_channels = 32
        for _ in range(nc):
            self.conv_blocks.append(nn.Sequential(
                nn.Conv3d(in_channels, out_channels, kernel_size=(1, 3, 3), stride=(1, 1, 1), padding=(0, 1, 1)),
                nn.BatchNorm3d(out_channels),
                nn.ReLU(),
                nn.Dropout3d(dropout_rate),
                nn.Conv3d(out_channels, out_channels, kernel_size=(1, 3, 3), stride=(1, 2, 2), padding=(0, 1, 1)),
                nn.BatchNorm3d(out_channels),
                nn.ReLU(),
                nn.Dropout3d(dropout_rate)
            ))
            in_channels = out_channels
            out_channels *= 2
        
        self.lstm_input_size = 128 * 2 * 2 * 4
        
        self.lstm = nn.LSTM(input_size=self.lstm_input_size, hidden_size=512, batch_first=True)
        
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, 5)
        
        self.value_head = nn.Linear(512, 1)
        
        self.dropout = nn.Dropout(dropout_rate)
        
        self.apply(self.orthogonal_init)

    
    def forward(self, x, return_value=False):
        # print(f"Input shape before processing: {x.shape}")
        batch_size, nt, height, width, channels = x.shape

        x = x.permute(0, 4, 1, 2, 3).contiguous()  # Change to (batch_size, nt, channels, height, width)
        # print(f"Input shape after permute: {x.shape}")
        
        for conv_block in self.conv_blocks:
            x = conv_block(x)

        # print(f"Shape after conv block: {x.shape}")
        
        # Flatten the spatial dimensions and combine with the time dimension
        _, c, t, h, w = x.shape
        x = x.permute(0, 2, 1, 3, 4).contiguous()
        x = x.view(batch_size, t, -1)

        # print(f"Shape after flatten: {x.shape}")
        
        lstm_out, _ = self.lstm(x)
        lstm_out = lstm_out[:, -1, :]  # Take the last output

        # print(f"Shape after lstm block: {x.shape}")
        
        x = self.fc1(lstm_out)
        x = torch.relu(x)
        x = self.dropout(x)

        # print(f"Shape after first linear + relu: {x.shape}")
        
        action_logits = self.fc2(x)

        # print(f"Shape after second linear + relu: {x.shape}")
        
        if return_value:
            value = self.value_head(x).squeeze(-1)
            return action_logits, value
        
        return action_logits
    
    def orthogonal_init(self, module):
        if isinstance(module, (nn.Conv3d, nn.Linear)):
            nn.init.orthogonal_(module.weight, gain=np.sqrt(2))
            if module.bias is not None:
                nn.init.constant_(module.bias, 0)
        elif isinstance(module, nn.LSTM):
            for name, param in module.named_parameters():
                if 'weight' in name:
                    nn.init.orthogonal_(param, gain=np.sqrt(2))
                elif 'bias' in name:
                    nn.init.constant_(param, 0)
        elif isinstance(module, nn.BatchNorm3d):
            nn.init.constant_(module.weight, 1)
            nn.init.constant_(module.bias, 0)

        # Recursively initialize submodules
        for child in module.children():
            self.orthogonal_init(child)

CNN model receives an 5D input as:
(batch_size, time_dim, obs_width, obs_height, chanels: rgb + local part of the global guidance)

in a particular example (128,4,30,30,4), where a 128 sized batch is processed, with 1 present and 3 past observations, a size 15 local field of view and rgb + global guidance chanels.

Thanks if you read it this far, and huge thanks if you could provide professional advice!

Thanks,
Mark

I tried debugged the code and checked implementations, and in my opinion (might be really wrong) it should be converging to an optimal policy, but after several steps it fails to generalize:
————————–97950’th episode – 1959’th start-end pair —————————

Reward: 0.03, Computing time: 1693.26 min/50 epochs
Goal reached for start-goal pair: 0 times, Number of collisions: 0

Terminations casued by – Reached goals: 3229.0, No guidance information: 19731.0, Max steps reached: 75107.0
100 epoch Policy Loss: 0.0076
100 epoch Value Loss: 1.7975
100 epoch Entropy: 1.2725
100 epoch KL Divergence: 0.0534

New contributor

Czimber Márk is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật