I have a game model with multiple entities that move around in 2 dimensions. I have an action space vector that is 4*number of entities.
I would like at each step in the model to move all of the entities with the actions vector, instead of moving 1 entity each time.
It works as the test function, but it does not work in the learning phase.
The general code is below, please recommend some suggestions to make this work. Thank you.
import gym
from gym import spaces
import cv2
import numpy as np
import random
import itertools
from stable_baselines3.common.env_checker import check_env
from stable_baselines3 import PPO
import os
num_squares = 30
square_size = 20
models_dir = "models/square/"
log_dir = "logs/square/"
class SquareGameEnv(gym.Env):
def __init__(self):
super(SquareGameEnv, self).__init__()
self.width = 800
self.height = 600
# Define the observation space with dtype as np.uint16
self.observation_space = spaces.Box(low=0, high=800, shape=(num_squares * 6,), dtype=np.uint16)
# Define the action space size
self.action_space = spaces.Discrete(4 * num_squares)
self.squares = []
def random_color(self):
colors = [(0, 255, 0), (0, 0, 255), (255, 0, 0)] # Green, Blue, Red
return random.choice(colors)
def create_square(self, img):
x = np.random.randint(0, img.shape[1] - square_size, dtype=np.uint16)
y = np.random.randint(0, img.shape[0] - square_size, dtype=np.uint16)
color = self.random_color()
cv2.rectangle(img, (x, y), (x + square_size, y + square_size), color, -1)
return (x, y, square_size, color)
def collide(self, square1, square2, square3):
x1, y1, size1, _ = square1
x2, y2, size2, _ = square2
x3, y3, size3, _ = square3
# Calculate distances between squares
distance1 = (x1 - x2) ** 2 + (y1 - y2) ** 2
distance2 = (x1 - x3) ** 2 + (y1 - y3) ** 2
distance3 = (x2 - x3) ** 2 + (y2 - y3) ** 2
# Check if all distances are below the threshold
if distance1 < (size1 + size2)**2 and distance2 < (size1 + size3)**2 and distance3 < (size2 + size3)**2:
return True
return False
def step(self, actions):
# Ensure actions is an iterable
if not isinstance(actions, (list, tuple, np.ndarray)):
actions = [actions]
# Define action mapping
action_mapping = {
0: (0, -5), # Up
1: (0, 5), # Down
2: (-5, 0), # Left
3: (5, 0) # Right
}
# Apply actions to move squares
for i, action in enumerate(actions):
# Determine which square to move based on action index
square_index = i // 4
move_direction = action % 4
# Move the selected square based on the action
x, y, size, color = self.squares[square_index]
dx, dy = action_mapping[move_direction]
# Update square position
x += dx
y += dy
# Check boundaries
if x < 0:
x = 0
elif x + size > self.width:
x = self.width - size
if y < 0:
y = 0
elif y + size > self.height:
y = self.height - size
self.squares[square_index] = (x, y, size, color)
# Check collisions
for i, j, k in itertools.combinations(range(len(self.squares)), 3):
# Check if square1 and square2 collide and have the same color
if self.collide(self.squares[i], self.squares[j], self.squares[k]) and self.squares[j][3] == self.squares[k][3]:
self.squares[i] = (self.squares[i][0], self.squares[i][1], self.squares[i][2], self.squares[j][3])
# Check if square2 and square3 collide and have the same color
elif self.collide(self.squares[j], self.squares[i], self.squares[k]) and self.squares[i][3] == self.squares[k][3]:
self.squares[j] = (self.squares[j][0], self.squares[j][1], self.squares[j][2], self.squares[i][3])
# Check if square1 and square3 collide and have the same color
elif self.collide(self.squares[k], self.squares[i], self.squares[j]) and self.squares[i][3] == self.squares[j][3]:
self.squares[k] = (self.squares[k][0], self.squares[k][1], self.squares[k][2], self.squares[i][3])
# Count number of blue and red squares
num_blue = sum(1 for _, _, _, c in self.squares if c == (0, 0, 255))
num_red = sum(1 for _, _, _, c in self.squares if c == (255, 0, 0))
# Calculate reward as the difference between blue and red squares
reward = (num_blue - num_red)
print("reward",reward)
#if num_red > num_blue:
# reward = 1
#elif num_blue > num_red:
# reward = -1
#else:
# reward = 0
# Update observation
observation = self.get_observation()
# Check if game is done
done = (num_red == 0 or num_blue == 0)
# Additional information, if needed
info = {}
return observation, reward, done, info
def reset(self):
# Reset squares
self.squares = []
for _ in range(num_squares):
self.squares.append(self.create_square(np.zeros((self.height, self.width, 3), dtype=np.uint8)))
# Return initial observation
return self.get_observation()
def get_observation(self):
# Extract positions, size, and color of each square
observation = np.zeros((num_squares, 6), dtype=np.uint16)
for i, square in enumerate(self.squares):
x, y, size, color = square
observation[i] = x, y, size, color[0], color[1], color[2]
return observation.flatten()
def render(self, mode='rgb_array'):
# Render the squares
image = np.zeros((self.height, self.width, 3), dtype=np.uint8)
for square in self.squares:
x, y, size, color = square
cv2.rectangle(image, (x, y), (x + size, y + size), color, -1)
if mode == 'rgb_array':
return image
elif mode == 'human':
cv2.imshow('Game', image)
cv2.waitKey(30) # Small delay for rendering
else:
super(SquareGameEnv, self).render(mode=mode)
def close(self):
cv2.destroyAllWindows()
# Testing the environment
def test():
env = SquareGameEnv()
check_env(env)
episodes = 1
for episode in range(episodes):
done = False
obs = env.reset()
while not done:
random_actions = [random.randint(0, 3) for _ in range(4 * num_squares)]
obs, reward, done, info = env.step(random_actions)
env.render(mode='human')
def learn():
if not os.path.exists(models_dir):
os.makedirs(models_dir)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
env = SquareGameEnv()
check_env(env)
env.reset()
model = PPO('MlpPolicy', env, verbose=1, tensorboard_log=log_dir)
TIMESTEPS = 10
episodes = 10
iters = 0
for i in range(episodes):
iters += 1
model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False)
model.save(f"{models_dir}/{TIMESTEPS * iters}")
print("episode ", i)
def play():
env = SquareGameEnv()
env.display = True
env.reset()
model_path = f"{models_dir}/1000.zip"
model = PPO.load(model_path, env=env)
episodes = 50
for ep in range(episodes):
obs = env.reset()
done = False
while not done:
action, _states = model.predict(obs)
obs, rewards, done, info = env.step(action)
print(rewards)
test()
learn()
The requirements.txt file is below, it has some other packages
decorator==5.1.1
exceptiongroup==1.2.1
executing==2.0.1
Farama-Notifications==0.0.4
filelock==3.13.1
fonttools==4.51.0
fsspec==2024.2.0
grpcio==1.63.0
gym==0.21.0
gym-notices==0.0.8
gymnasium==0.29.1
intel-openmp==2021.4.0
ipython==8.24.0
jedi==0.19.1
Jinja2==3.1.3
kiwisolver==1.4.5
Markdown==3.6
MarkupSafe==2.1.5
matplotlib==3.8.4
matplotlib-inline==0.1.7
mkl==2021.4.0
mpmath==1.3.0
networkx==3.2.1
numpy==1.26.3
opencv-python==4.9.0.80
packaging==24.0
pandas==2.2.2
parso==0.8.4
pillow==10.2.0
prompt-toolkit==3.0.43
protobuf==5.26.1
pure-eval==0.2.2
Pygments==2.18.0
pyparsing==3.1.2
python-dateutil==2.9.0.post0
pytz==2024.1
six==1.16.0
stable-baselines3==1.6.0
stack-data==0.6.3
sympy==1.12
tbb==2021.11.0
tensorboard==2.16.2
tensorboard-data-server==0.7.2
torch==2.3.0+cu118
torchaudio==2.3.0+cu118
torchvision==0.18.0+cu118
traitlets==5.14.3
typing_extensions==4.9.0
tzdata==2024.1
wcwidth==0.2.13
Werkzeug==3.0.3