I’m working on implementing a reinforcement learning (RL) environment for a Pong game using OpenAI’s Gym. The goal is to train an AI agent to play Pong by controlling the paddle.
The agent receives too many negative rewards, even when it seems to be moving correctly. Specifically, the reward function penalizes the agent for moving away from the ball, but this happens too frequently and seems to occur even when the ball is moving towards the paddle.
Observation Space: The observation space includes the positions and speeds of the ball and paddles, the distance between the ball and paddles, and the paddle speeds.
Action Space: The action space consists of three discrete actions: move up, move down, and stay still.
Reward Function: The reward function rewards the agent for hitting the ball and penalizes it for missing the ball or moving away from it.
How can I adjust the reward function to only penalize the agent when it moves away from the ball while the ball is moving towards the paddle? or there any other problems with my game’s logic or DQNA implementation? Any suggestions on improving the training performance would be greatly appreciated.
Following is my code for DQNA:
<code>import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from collections import deque
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=10000) # Increased replay memory size
self.gamma = 0.99 # Increased discount rate
self.epsilon = 1.0 # Exploration rate
self.epsilon_decay = 0.995 # Slower epsilon decay
self.learning_rate = 0.001 # Slightly increased learning rate
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
model.add(Input(shape=(self.state_size,))) # Added Input layer to define the input shape
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(self.action_size, activation='linear'))
model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
if np.random.rand() <= self.epsilon:
return np.random.randint(self.action_size)
act_values = self.model.predict(state)
return np.argmax(act_values[0])
def replay(self, batch_size):
if len(self.memory) < batch_size:
return 0 # Not enough memory to sample
minibatch = random.sample(self.memory, batch_size)
for state, action, reward, next_state, done in minibatch:
target = (reward + self.gamma *
np.amax(self.target_model.predict(next_state)[0]))
target_f = self.model.predict(state)
target_f[0][action] = target
history = self.model.fit(state, target_f, epochs=1, verbose=0)
total_loss += history.history['loss'][0]
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
return total_loss / batch_size # Return average loss
def update_target_model(self):
# Copy weights from model to target_model
self.target_model.set_weights(self.model.get_weights())
def save(self, filename):
self.model.save(f"{filename}.keras")
def load(self, filename):
self.model = load_model(f"{filename}")
self.target_model = self._build_model()
self.update_target_model()
<code>import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.optimizers import Adam
import numpy as np
import random
from collections import deque
class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=10000) # Increased replay memory size
self.gamma = 0.99 # Increased discount rate
self.epsilon = 1.0 # Exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.995 # Slower epsilon decay
self.learning_rate = 0.001 # Slightly increased learning rate
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
def _build_model(self):
model = Sequential()
model.add(Input(shape=(self.state_size,))) # Added Input layer to define the input shape
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(self.action_size, activation='linear'))
model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
return model
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return np.random.randint(self.action_size)
else:
act_values = self.model.predict(state)
return np.argmax(act_values[0])
def replay(self, batch_size):
if len(self.memory) < batch_size:
return 0 # Not enough memory to sample
minibatch = random.sample(self.memory, batch_size)
total_loss = 0
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
target = (reward + self.gamma *
np.amax(self.target_model.predict(next_state)[0]))
target_f = self.model.predict(state)
target_f[0][action] = target
history = self.model.fit(state, target_f, epochs=1, verbose=0)
total_loss += history.history['loss'][0]
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
return total_loss / batch_size # Return average loss
def update_target_model(self):
# Copy weights from model to target_model
self.target_model.set_weights(self.model.get_weights())
def save(self, filename):
self.model.save(f"{filename}.keras")
def load(self, filename):
self.model = load_model(f"{filename}")
self.target_model = self._build_model()
self.update_target_model()
</code>
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.optimizers import Adam
import numpy as np
import random
from collections import deque
class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=10000) # Increased replay memory size
self.gamma = 0.99 # Increased discount rate
self.epsilon = 1.0 # Exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.995 # Slower epsilon decay
self.learning_rate = 0.001 # Slightly increased learning rate
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
def _build_model(self):
model = Sequential()
model.add(Input(shape=(self.state_size,))) # Added Input layer to define the input shape
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(48, activation='relu')) # Increased neurons to 48
model.add(Dropout(0.2)) # Dropout layer for regularization
model.add(Dense(self.action_size, activation='linear'))
model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
return model
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return np.random.randint(self.action_size)
else:
act_values = self.model.predict(state)
return np.argmax(act_values[0])
def replay(self, batch_size):
if len(self.memory) < batch_size:
return 0 # Not enough memory to sample
minibatch = random.sample(self.memory, batch_size)
total_loss = 0
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
target = (reward + self.gamma *
np.amax(self.target_model.predict(next_state)[0]))
target_f = self.model.predict(state)
target_f[0][action] = target
history = self.model.fit(state, target_f, epochs=1, verbose=0)
total_loss += history.history['loss'][0]
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
return total_loss / batch_size # Return average loss
def update_target_model(self):
# Copy weights from model to target_model
self.target_model.set_weights(self.model.get_weights())
def save(self, filename):
self.model.save(f"{filename}.keras")
def load(self, filename):
self.model = load_model(f"{filename}")
self.target_model = self._build_model()
self.update_target_model()
Following is my Code for Pong logic:
from pygame.locals import K_w, K_s
metadata = {'render.modes': ['human']}
super(PongEnv, self).__init__()
self.ball = pygame.Rect(self.width // 2 - 15, self.height // 2 - 15, 30, 30)
self.player_paddle = pygame.Rect(self.width - 20, self.height // 2 - 70, 10, 140)
self.ai_paddle = pygame.Rect(10, self.height // 2 - 70, 10, 140)
self.ball_dx, self.ball_dy = self.ball_speed, self.ball_speed
self.player_paddle_speed = 0 # Initialize player paddle speed
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(low=0, high=255, shape=(13,), dtype=np.float32) # Updated shape
self.ball.center = (self.width // 2, self.height // 2)
self.player_paddle.centery = self.height // 2
self.ai_paddle.centery = self.height // 2
self.ball_dx, self.ball_dy = self.ball_speed, self.ball_speed
self.player_paddle_speed = 0 # Reset player paddle speed
distance_ball_player = self.ball.centery - self.player_paddle.centery
distance_ball_ai = self.ball.centery - self.ai_paddle.centery
self.player_paddle.centery,
self.ball_dy, # Adding ball's vertical speed
distance_ball_player, # Adding distance between ball and player paddle
distance_ball_ai, # Adding distance between ball and AI paddle
self.player_paddle.bottom,
self.player_paddle_speed, # Adding player paddle speed
return np.array(state, dtype=np.float32)
previous_player_paddle_position = self.player_paddle.centery # Track previous position
self.player_paddle.centery -= self.paddle_speed
self.player_paddle.centery += self.paddle_speed
# Ensure paddle stays within the screen
if self.player_paddle.top < 0:
self.player_paddle.top = 0
if self.player_paddle.bottom > self.height:
self.player_paddle.bottom = self.height
# Calculate player paddle speed
self.player_paddle_speed = self.player_paddle.centery - previous_player_paddle_position
self.ball.x += self.ball_dx
self.ball.y += self.ball_dy
# Ball collision with top or bottom
if self.ball.top <= 0 or self.ball.bottom >= self.height:
# Ball collision with paddles
if self.ball.colliderect(self.player_paddle):
reward += 5 # Reward for hitting the ball with the paddle
print(f"Ball hit by player paddle. Reward: {reward}")
elif self.ball.colliderect(self.ai_paddle):
# Check for out of bounds
reward += 10 # Reward for scoring
print(f"Ball out of left bounds. Reward: {reward}")
elif self.ball.right >= self.width:
reward -= 10 # Penalty for opponent scoring
print(f"Ball out of right bounds. Penalty: {reward}")
if self.ai_paddle.centery < self.ball.centery:
self.ai_paddle.centery += self.paddle_speed
if self.ai_paddle.centery > self.ball.centery:
self.ai_paddle.centery -= self.paddle_speed
# Ensure AI paddle stays within the screen
if self.ai_paddle.top < 0:
if self.ai_paddle.bottom > self.height:
self.ai_paddle.bottom = self.height
# Reward for moving towards the ball and penalty for moving away
if self.ball_dx > 0: # Ball moving towards the player paddle
if (action == 0 and self.player_paddle.centery > self.ball.centery) or (action == 1 and self.player_paddle.centery < self.ball.centery):
print(f"Player paddle moving towards ball. Reward: {reward}")
reward -= 0.5 # Penalty for moving away from the ball
print(f"Player paddle moving away from ball. Penalty: {reward}")
next_state = self.get_state()
return next_state, reward, done, {}
def render(self, mode='human'):
screen = pygame.display.set_mode((self.width, self.height))
pygame.draw.rect(screen, (255, 255, 255), self.player_paddle)
pygame.draw.rect(screen, (255, 255, 255), self.ai_paddle)
pygame.draw.ellipse(screen, (255, 255, 255), self.ball)
pygame.draw.aaline(screen, (255, 255, 255), (self.width // 2, 0), (self.width // 2, self.height))
<code>import gym
from gym import spaces
import numpy as np
import pygame
from pygame.locals import K_w, K_s
class PongEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self):
super(PongEnv, self).__init__()
self.width = 640
self.height = 480
self.ball_speed = 7
self.paddle_speed = 12
self.ball = pygame.Rect(self.width // 2 - 15, self.height // 2 - 15, 30, 30)
self.player_paddle = pygame.Rect(self.width - 20, self.height // 2 - 70, 10, 140)
self.ai_paddle = pygame.Rect(10, self.height // 2 - 70, 10, 140)
self.ball_dx, self.ball_dy = self.ball_speed, self.ball_speed
self.player_paddle_speed = 0 # Initialize player paddle speed
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(low=0, high=255, shape=(13,), dtype=np.float32) # Updated shape
def reset(self):
self.ball.center = (self.width // 2, self.height // 2)
self.player_paddle.centery = self.height // 2
self.ai_paddle.centery = self.height // 2
self.ball_dx, self.ball_dy = self.ball_speed, self.ball_speed
self.player_paddle_speed = 0 # Reset player paddle speed
return self.get_state()
def get_state(self):
distance_ball_player = self.ball.centery - self.player_paddle.centery
distance_ball_ai = self.ball.centery - self.ai_paddle.centery
state = [
self.player_paddle.centery,
self.ai_paddle.centery,
self.ball.centerx,
self.ball.centery,
self.ball_dx,
self.ball_dy, # Adding ball's vertical speed
distance_ball_player, # Adding distance between ball and player paddle
distance_ball_ai, # Adding distance between ball and AI paddle
self.player_paddle.top,
self.player_paddle.bottom,
self.ai_paddle.top,
self.ai_paddle.bottom,
self.player_paddle_speed, # Adding player paddle speed
]
return np.array(state, dtype=np.float32)
def step(self, action):
reward = 0
done = False
previous_player_paddle_position = self.player_paddle.centery # Track previous position
# Move player paddle
if action == 0:
self.player_paddle.centery -= self.paddle_speed
elif action == 1:
self.player_paddle.centery += self.paddle_speed
# Ensure paddle stays within the screen
if self.player_paddle.top < 0:
self.player_paddle.top = 0
if self.player_paddle.bottom > self.height:
self.player_paddle.bottom = self.height
# Calculate player paddle speed
self.player_paddle_speed = self.player_paddle.centery - previous_player_paddle_position
# Move the ball
self.ball.x += self.ball_dx
self.ball.y += self.ball_dy
# Ball collision with top or bottom
if self.ball.top <= 0 or self.ball.bottom >= self.height:
self.ball_dy *= -1
# Ball collision with paddles
if self.ball.colliderect(self.player_paddle):
self.ball_dx *= -1
reward += 5 # Reward for hitting the ball with the paddle
print(f"Ball hit by player paddle. Reward: {reward}")
elif self.ball.colliderect(self.ai_paddle):
self.ball_dx *= -1
# Check for out of bounds
if self.ball.left <= 0:
done = True
reward += 10 # Reward for scoring
print(f"Ball out of left bounds. Reward: {reward}")
elif self.ball.right >= self.width:
done = True
reward -= 10 # Penalty for opponent scoring
print(f"Ball out of right bounds. Penalty: {reward}")
# AI paddle movement
if self.ai_paddle.centery < self.ball.centery:
self.ai_paddle.centery += self.paddle_speed
if self.ai_paddle.centery > self.ball.centery:
self.ai_paddle.centery -= self.paddle_speed
# Ensure AI paddle stays within the screen
if self.ai_paddle.top < 0:
self.ai_paddle.top = 0
if self.ai_paddle.bottom > self.height:
self.ai_paddle.bottom = self.height
# Reward for moving towards the ball and penalty for moving away
if self.ball_dx > 0: # Ball moving towards the player paddle
if (action == 0 and self.player_paddle.centery > self.ball.centery) or (action == 1 and self.player_paddle.centery < self.ball.centery):
reward += 0.5
print(f"Player paddle moving towards ball. Reward: {reward}")
else:
reward -= 0.5 # Penalty for moving away from the ball
print(f"Player paddle moving away from ball. Penalty: {reward}")
next_state = self.get_state()
return next_state, reward, done, {}
def render(self, mode='human'):
if mode == 'human':
pygame.init()
screen = pygame.display.set_mode((self.width, self.height))
screen.fill((0, 0, 0))
pygame.draw.rect(screen, (255, 255, 255), self.player_paddle)
pygame.draw.rect(screen, (255, 255, 255), self.ai_paddle)
pygame.draw.ellipse(screen, (255, 255, 255), self.ball)
pygame.draw.aaline(screen, (255, 255, 255), (self.width // 2, 0), (self.width // 2, self.height))
pygame.display.flip()
</code>
import gym
from gym import spaces
import numpy as np
import pygame
from pygame.locals import K_w, K_s
class PongEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self):
super(PongEnv, self).__init__()
self.width = 640
self.height = 480
self.ball_speed = 7
self.paddle_speed = 12
self.ball = pygame.Rect(self.width // 2 - 15, self.height // 2 - 15, 30, 30)
self.player_paddle = pygame.Rect(self.width - 20, self.height // 2 - 70, 10, 140)
self.ai_paddle = pygame.Rect(10, self.height // 2 - 70, 10, 140)
self.ball_dx, self.ball_dy = self.ball_speed, self.ball_speed
self.player_paddle_speed = 0 # Initialize player paddle speed
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(low=0, high=255, shape=(13,), dtype=np.float32) # Updated shape
def reset(self):
self.ball.center = (self.width // 2, self.height // 2)
self.player_paddle.centery = self.height // 2
self.ai_paddle.centery = self.height // 2
self.ball_dx, self.ball_dy = self.ball_speed, self.ball_speed
self.player_paddle_speed = 0 # Reset player paddle speed
return self.get_state()
def get_state(self):
distance_ball_player = self.ball.centery - self.player_paddle.centery
distance_ball_ai = self.ball.centery - self.ai_paddle.centery
state = [
self.player_paddle.centery,
self.ai_paddle.centery,
self.ball.centerx,
self.ball.centery,
self.ball_dx,
self.ball_dy, # Adding ball's vertical speed
distance_ball_player, # Adding distance between ball and player paddle
distance_ball_ai, # Adding distance between ball and AI paddle
self.player_paddle.top,
self.player_paddle.bottom,
self.ai_paddle.top,
self.ai_paddle.bottom,
self.player_paddle_speed, # Adding player paddle speed
]
return np.array(state, dtype=np.float32)
def step(self, action):
reward = 0
done = False
previous_player_paddle_position = self.player_paddle.centery # Track previous position
# Move player paddle
if action == 0:
self.player_paddle.centery -= self.paddle_speed
elif action == 1:
self.player_paddle.centery += self.paddle_speed
# Ensure paddle stays within the screen
if self.player_paddle.top < 0:
self.player_paddle.top = 0
if self.player_paddle.bottom > self.height:
self.player_paddle.bottom = self.height
# Calculate player paddle speed
self.player_paddle_speed = self.player_paddle.centery - previous_player_paddle_position
# Move the ball
self.ball.x += self.ball_dx
self.ball.y += self.ball_dy
# Ball collision with top or bottom
if self.ball.top <= 0 or self.ball.bottom >= self.height:
self.ball_dy *= -1
# Ball collision with paddles
if self.ball.colliderect(self.player_paddle):
self.ball_dx *= -1
reward += 5 # Reward for hitting the ball with the paddle
print(f"Ball hit by player paddle. Reward: {reward}")
elif self.ball.colliderect(self.ai_paddle):
self.ball_dx *= -1
# Check for out of bounds
if self.ball.left <= 0:
done = True
reward += 10 # Reward for scoring
print(f"Ball out of left bounds. Reward: {reward}")
elif self.ball.right >= self.width:
done = True
reward -= 10 # Penalty for opponent scoring
print(f"Ball out of right bounds. Penalty: {reward}")
# AI paddle movement
if self.ai_paddle.centery < self.ball.centery:
self.ai_paddle.centery += self.paddle_speed
if self.ai_paddle.centery > self.ball.centery:
self.ai_paddle.centery -= self.paddle_speed
# Ensure AI paddle stays within the screen
if self.ai_paddle.top < 0:
self.ai_paddle.top = 0
if self.ai_paddle.bottom > self.height:
self.ai_paddle.bottom = self.height
# Reward for moving towards the ball and penalty for moving away
if self.ball_dx > 0: # Ball moving towards the player paddle
if (action == 0 and self.player_paddle.centery > self.ball.centery) or (action == 1 and self.player_paddle.centery < self.ball.centery):
reward += 0.5
print(f"Player paddle moving towards ball. Reward: {reward}")
else:
reward -= 0.5 # Penalty for moving away from the ball
print(f"Player paddle moving away from ball. Penalty: {reward}")
next_state = self.get_state()
return next_state, reward, done, {}
def render(self, mode='human'):
if mode == 'human':
pygame.init()
screen = pygame.display.set_mode((self.width, self.height))
screen.fill((0, 0, 0))
pygame.draw.rect(screen, (255, 255, 255), self.player_paddle)
pygame.draw.rect(screen, (255, 255, 255), self.ai_paddle)
pygame.draw.ellipse(screen, (255, 255, 255), self.ball)
pygame.draw.aaline(screen, (255, 255, 255), (self.width // 2, 0), (self.width // 2, self.height))
pygame.display.flip()