I am working on a project to compare different modeling techniques for optimizing the waiting time of consumers at a movie theater. Specifically, I have created simple models using:
Discrete Event Simulation (DES)
Agent-Based Modeling (ABM)
Q-Learning based Reinforcement Learning
While the DES model produces realistic results where the waiting time changes accordingly when I modify parameters such as the number of customers and servers, the ABM and Q-Learning models do not exhibit similar behavior. In the ABM and Q-Learning models, the waiting time remains almost the same regardless of parameter changes.
Here is a brief overview of my implementations:
DES based using simpy
import simpy
import random
import statistics
wait_times = []
class Theater(object):
def __init__(self, env, num_cashiers, num_servers, num_ushers):
self.env = env
self.cashier = simpy.Resource(env, num_cashiers)
self.server = simpy.Resource(env, num_servers)
self.usher = simpy.Resource(env, num_ushers)
def purchase_ticket(self, moviegoer):
yield self.env.timeout(random.randint(1, 3))
def check_ticket(self, moviegoer):
yield self.env.timeout(3 / 60)
def sell_food(self, moviegoer):
yield self.env.timeout(random.randint(1, 6))
def go_to_movies(env, moviegoer, theater):
arrival_time = env.now
# buy ticket
with theater.cashier.request() as request:
yield request
yield env.process(theater.purchase_ticket(moviegoer))
# check ticket
with theater.usher.request() as request:
yield request
yield env.process(theater.check_ticket(moviegoer))
# buy food
if random.choice([True, False]):
with theater.server.request() as request:
yield request
yield env.process(theater.sell_food(moviegoer))
# go to their seat
wait_times.append(env.now - arrival_time)
def run_theater(env, num_cashiers, num_servers, num_ushers, total_moviegoers):
theater = Theater(env, num_cashiers, num_servers, num_ushers)
moviegoer_id = 0
for moviegoer in range(3):
env.process(go_to_movies(env, moviegoer, theater))
moviegoer_id += 1
while moviegoer_id < total_moviegoers:
yield env.timeout(0.20)
env.process(go_to_movies(env, moviegoer_id, theater))
moviegoer_id += 1
def calculate_wait_times(wait_times):
average_wait = statistics.mean(wait_times)
# pretty print results:
minutes, frac_minutes = divmod(average_wait, 1)
seconds = frac_minutes * 60
return round(minutes), round(seconds)
def get_user_input():
num_cashiers = 10
num_servers = 20
num_ushers = 5
params = [num_cashiers, num_servers, num_ushers]
if all(str(i).isdigit() for i in params):
params = [int(x) for x in params]
else:
print(
"Could not parse input. The simulation will use default values",
"n1 cashier, 1 server, 1 usher.",
)
params = [1, 1, 1]
return params
def main():
# Setup
random.seed(42)
num_cashiers, num_servers, num_ushers = get_user_input()
total_moviegoers = 100 # Number of moviegoers to simulate
# Run the simulation
env = simpy.Environment()
env.process(run_theater(env, num_cashiers, num_servers, num_ushers, total_moviegoers))
env.run()
# View the results
mins, secs = calculate_wait_times(wait_times)
print(
"Running simulation...",
f"nThe average wait time is {mins} minutes and {secs} seconds.",
f"nTotal number of moviegoers: {total_moviegoers}"
)
main()
Agent based model using mesa implementation
from mesa import Agent, Model
from mesa.time import RandomActivation
from mesa.space import MultiGrid
from mesa.datacollection import DataCollector
import random
class MoviegoerAgent(Agent):
def __init__(self, unique_id, model):
super().__init__(unique_id, model)
self.wait_time = 0
def step(self):
# simulate arrival at the theater and the various steps
self.buy_ticket()
self.check_ticket()
if random.choice([True, False]):
self.buy_food()
self.model.wait_times.append(self.wait_time)
def buy_ticket(self):
cashier = random.choice(self.model.cashiers)
self.wait_time += random.randint(1, 3)
cashier.busy = False
def check_ticket(self):
usher = random.choice(self.model.ushers)
self.wait_time += 3 / 60
usher.busy = False
def buy_food(self):
server = random.choice(self.model.servers)
self.wait_time += random.randint(1, 6)
server.busy = False
class TheaterAgent(Agent):
def __init__(self, unique_id, model):
super().__init__(unique_id, model)
self.busy = False
class TheaterModel(Model):
def __init__(self, num_cashiers, num_servers, num_ushers, num_moviegoers):
self.num_agents = num_moviegoers
self.schedule = RandomActivation(self)
self.grid = MultiGrid(1, 1, False)
self.wait_times = []
# Create cashiers
self.cashiers = []
for i in range(num_cashiers):
cashier = TheaterAgent(i, self)
self.schedule.add(cashier)
self.cashiers.append(cashier)
# Create servers
self.servers = []
for i in range(num_cashiers, num_cashiers + num_servers):
server = TheaterAgent(i, self)
self.schedule.add(server)
self.servers.append(server)
# Create ushers
self.ushers = []
for i in range(num_cashiers + num_servers, num_cashiers + num_servers + num_ushers):
usher = TheaterAgent(i, self)
self.schedule.add(usher)
self.ushers.append(usher)
# Create moviegoers
for i in range(num_moviegoers):
moviegoer = MoviegoerAgent(i + num_cashiers + num_servers + num_ushers, self)
self.schedule.add(moviegoer)
def step(self):
self.schedule.step()
def run_theater_simulation(num_cashiers, num_servers, num_ushers, num_moviegoers, num_steps):
model = TheaterModel(num_cashiers, num_servers, num_ushers, num_moviegoers)
for _ in range(num_steps):
model.step()
return model.wait_times
def calculate_average_wait_time(wait_times):
average_wait = sum(wait_times) / len(wait_times)
minutes, frac_minutes = divmod(average_wait, 1)
seconds = frac_minutes * 60
return round(minutes), round(seconds)
if __name__ == "__main__":
num_cashiers = 1
num_servers = 2
num_ushers = 1
num_moviegoers = 100
num_steps = 100
wait_times = run_theater_simulation(num_cashiers, num_servers, num_ushers, num_moviegoers, num_steps)
mins, secs = calculate_average_wait_time(wait_times)
print(f"The average wait time is {mins} minutes and {secs} seconds.")
Q-learning based implementation
import numpy as np
import gym
from gym import spaces
import random
class TheaterEnv(gym.Env):
def __init__(self, num_cashiers, num_servers, num_ushers):
super(TheaterEnv, self).__init__()
self.num_cashiers = num_cashiers
self.num_servers = num_servers
self.num_ushers = num_ushers
# Define action and observation space
self.action_space = spaces.Discrete(3) # Three actions: manage cashier, server, usher
self.observation_space = spaces.MultiDiscrete([num_cashiers + 1, num_servers + 1, num_ushers + 1])
self.reset()
def reset(self):
# Reset the state of the environment to an initial state
self.state = [self.num_cashiers, self.num_servers, self.num_ushers]
self.total_wait_time = 0
self.steps = 0
self.moviegoer_count = 0
return np.array(self.state)
def step(self, action):
self.steps += 1
done = self.steps >= 100
if action == 0 and self.state[0] > 0:
wait_time = random.randint(1, 3)
self.state[0] -= 1
elif action == 1 and self.state[1] > 0:
wait_time = random.randint(1, 6)
self.state[1] -= 1
elif action == 2 and self.state[2] > 0:
wait_time = 3 / 60
self.state[2] -= 1
else:
wait_time = 1 # Penalty for invalid action
self.total_wait_time += wait_time
self.moviegoer_count += 1
reward = -wait_time # We want to minimize wait time
return np.array(self.state), reward, done, {}
def render(self, mode='human'):
print(f"State: {self.state}, Total Wait Time: {self.total_wait_time}")
def q_learning(env, num_episodes=1000, learning_rate=0.1, discount_factor=0.99, epsilon=0.1):
# Initialize Q-table with dimensions matching the state space
q_table = np.zeros((env.observation_space.nvec[0], env.observation_space.nvec[1], env.observation_space.nvec[2], env.action_space.n))
for episode in range(num_episodes):
state = env.reset()
done = False
while not done:
if random.uniform(0, 1) < epsilon:
action = env.action_space.sample() # Explore action space
else:
action = np.argmax(q_table[state[0], state[1], state[2]]) # Exploit learned values
next_state, reward, done, _ = env.step(action)
old_value = q_table[state[0], state[1], state[2], action]
next_max = np.max(q_table[next_state[0], next_state[1], next_state[2]])
new_value = (1 - learning_rate) * old_value + learning_rate * (reward + discount_factor * next_max)
q_table[state[0], state[1], state[2], action] = new_value
state = next_state
return q_table
if __name__ == "__main__":
num_cashiers = 10
num_servers = 100
num_ushers = 1
env = TheaterEnv(num_cashiers, num_servers, num_ushers)
q_table = q_learning(env, num_episodes=100)
state = env.reset()
done = False
total_wait_time = 0
moviegoer_count = 0
while not done:
action = np.argmax(q_table[state[0], state[1], state[2]])
state, reward, done, _ = env.step(action)
total_wait_time += -reward # Reward is negative of wait time
moviegoer_count += 1
average_wait_time = total_wait_time / moviegoer_count
# Calculate minutes and seconds
average_wait_time_minutes = int(average_wait_time)
average_wait_time_seconds = int((average_wait_time - average_wait_time_minutes) * 60)
print(f"Total moviegoers served: {moviegoer_count}")
print(f"Average wait time after Q-learning: {average_wait_time_minutes} minutes and {average_wait_time_seconds} seconds")
How can I make the ABM and Q-Learning models more realistic, similar to the DES model? What techniques or modifications can I apply to ensure that the waiting time in ABM and Q-Learning models changes appropriately with different parameter values?
Any insights, suggestions, or examples would be greatly appreciated. Thank you!
diffusion entropy is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.