I am developing an agent-based model where agents are placed on an NK-landscape (a combinatorial problem) and are tasked with finding the highest point by manipulating a bit string. Agents do not interact with one another, and have a choice between two different climbing methods. At each time step the agent needs to decide what climbing method to use based off the information available to them (current fitness, potentially previous fitness). After implementing a climbing method they receive feedback (their new altitude). I see this as a version of the multi-armed bandit problem with each agent being its own bandit.
While I thought Thompson Sampling was an obvious choice there are some characteristics of my model that seem to make it break. For example, the landscape payoffs are initially drawn from a uniform distribution [0,1], and are not i.i.d. or MDS because different solutions are interdependent with one another (due to how fitness is calculated on an NK-landscape) and past decisions influence future payoffs. The landscape is also turbulent, in that it is shifted by a shock every t timepoints.
Basically my question is, can Thompson Sampling solve this problem, and if not is there a RL algorithm that is robust enough to deal with the nature of this problem?
Here is the core of my Thompson Sampling algorithm written in Python as a multi-armed bandit using an underlying Gaussian distribution for the rewards of the arms as a test environment to prove the algorithm works. When I replace the reward function with a bounded uniform distribution it breaks down.
Distribution approximation: Gaussian unknown mean and variance
Conjugate prior: Normal Gamma
Wikipedia: http://en.wikipedia.org/wiki/Normal-gamma_distribution
set up
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import gamma, norm
def sn_calculate(old_mean, new_mean, number):
# function used to iteratively calculate new s, given:
# - old mean (without current number)
# - new mean(with current number)
# - new sample
# in order to get variance, divide sn by n
sn = (number - new_mean) * (number - old_mean)
return sn
initialize machines
class Machine:
def __init__(self, mu, tau):
# Real parameters
self.mu = mu
self.tau = tau
# Make a very simple prior
# Prior parameters:
# mu - 1x1 - prior mean
# n_mu - 1x1 - number of observations of mean
# tau - 1x1 - prior precision (1 / variance)
# n_tau - 1x1 - number of observations of tau
mu = 0.
n_mu = 1.
tau = 1.
n_tau = 1.
# Prior parameters after conversion:
# mu0 - prior mean
# lambda0 - pseudo observations for prior mean
# alpha0 - inverse gamma shape
# beta0 - inverse gamma scale
self.mu0 = mu
self.lambda0 = n_mu
self.alpha0 = n_tau * 0.5
self.beta0 = ((0.5 * n_tau) / tau)
# Parameters of posterior(initially)
self.model_mu = self.mu0
self.model_lambda = self.lambda0
self.model_alpha = self.alpha0
self.model_beta = self.beta0
# additional parameters
self.sn = 0
self.n = 0
self.mean = 0
def useMachine(self):
# use the real machine (that we try to model)
draw = np.random.normal(self.mu, np.sqrt(1 / self.tau))
return draw
def sampleParameters_tau(self):
# returns sample of the precission (from gamma distribution)
# that is used in the model of the machine
tau = np.random.gamma(shape=self.model_alpha, scale=(1. / self.model_beta))
return tau
def sampleParameters_mean(self, tau):
# returns sample of the mean (from gaussian distribution)
# that is used in the model of machine
var = 1. / (self.model_lambda * tau)
mean = np.random.normal(loc=self.model_mu, scale=np.sqrt(var))
return mean
def sampleFromOurModelOfMachine(self):
# returns sample from our model of machine (Gaussian Dist)
tau = self.sampleParameters_tau()
mean = self.sampleParameters_mean(tau)
gaus_number = np.random.normal(mean, np.sqrt(1 / tau))
return gaus_number
def updateOurModelOfMachine(self, MachineReturned):
# Update the parameters of model
# additional parameters
old_mean = self.mean
self.n += 1
self.mean += (MachineReturned - self.mean) / self.n
self.sn += sn_calculate(old_mean, self.mean, MachineReturned)
# Parameters of posterior(initially)
self.model_lambda += 1
self.model_mu += (MachineReturned - self.model_mu) / self.model_lambda
self.model_alpha += 0.5
prior_disc = self.lambda0 * self.n * ((self.mean - self.mu0) ** 2) / self.model_lambda
self.model_beta = self.beta0 + 0.5 * (self.sn + prior_disc)
initialize experiments
def experiment(parameters, N):
# Run experiment of modeling machines described by "parameters"
# Experiment is repeated "N" times
# To save all machines
machines = []
# Create machine for each set of parameters
for p in parameters:
(mn, ta) = p
machines.append(Machine(mn, ta))
# Place to save all the results of using the best machine (according to our models)
results_DB = np.empty(N)
# execute N experiments
for i in range(N):
best_machine = None
sample = 0
# choosing the best machine (according to the highest mean)
best_machine = np.argmax(
[single_machine.sampleParameters_mean(single_machine.sampleParameters_tau()) for single_machine in
machines])
# Get real sample of the best machine
sample = machines[best_machine].useMachine()
# Update the model of this machine based on the sample
machines[best_machine].updateOurModelOfMachine(sample)
# And save the sample to results_DB
results_DB[i] = sample
# print results
print("Tau assessment")
plt.figure()
for single_machine in machines:
single_machine.printTau()
print("======")
print("")
plt.figure()
print("Mean assessment")
for single_machine in machines:
single_machine.printMu()
print("======")
print("")
plt.figure()
print("Full model calcualtion")
for single_machine in machines:
single_machine.printModel()
print("======")
print("")
plt.figure()
print("Real values")
for single_machine in machines:
single_machine.printReal()
print("======")
# Return DB with all results
return results_DB
Run
# number of experiments
N = 10000
# Parameters of machines
# Mean
m1 = 0.8
m2 = .9
m3 = .75
# Precision
t1 = 4
t2 = 2
t3 = 1
# Run the experiemtns
results = experiment([(m1, t1), (m2, t2), (m3, t3)], N)
# Draw the efficiency of the Thompson Algorithm (moving average)
cumulative_average = np.cumsum(results) / (np.arange(N) + 1)
# draw results
plt.figure()
plt.plot(cumulative_average)
plt.plot(np.ones(N) * m1)
plt.plot(np.ones(N) * m2)
plt.plot(np.ones(N) * m3)
plt.xscale('log')
plt.title("Performance of the Thompson Algorithm")
plt.show()
JustJones23 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.