I am trying to use reinforcement learning to tune the integrator gain, in discrete time.
It however does not converge, and it is so sensitive in how I choose the parameters that I wonder if this is correct.
import time
import matplotlib.pyplot as plt
import numpy as np
from scipy.interpolate import make_interp_spline
%matplotlib inline
import sys
import numpy as np
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
import tensorflow as tf
import plotly.graph_objects as go
Actor to critic algorithm:
class A2CPIDTunner:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.value_size = 1
self.grad_bound = 1
self.std_bound = [1e-2, 0.1]
self.discount_factor = 1
self.actor = self.build_actor()
self.critic = self.build_critic()
self.actor_optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
self.critic_optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)
def build_actor(self):
input_state = tf.keras.Input((self.state_size,))
d1 = tf.keras.layers.Dense(10, activation='relu')(input_state)
d2 = tf.keras.layers.Dense(10, activation='relu')(d1)
d3 = tf.keras.layers.Dense(10, activation='relu')(d2)
out_mu = tf.keras.layers.Dense(self.action_size, activation='sigmoid')(d3)
out_std = tf.keras.layers.Dense(self.action_size, activation='sigmoid')(d3)
#out_std = tf.keras.layers.Lambda(lambda x: x * 0.1)(dense_layer)
actor = tf.keras.Model(input_state, [out_mu, out_std])
return actor
def build_critic(self):
input_state = tf.keras.Input((self.state_size,))
d1 = tf.keras.layers.Dense(10, activation='relu')(input_state)
d2 = tf.keras.layers.Dense(10, activation='relu')(d1)
d3 = tf.keras.layers.Dense(10, activation='relu')(d2)
output = tf.keras.layers.Dense(1, activation='sigmoid')(d3)
critic = tf.keras.Model(input_state, output)
return critic
def log_pdf(self, mu, std, action):
std = tf.clip_by_value(std, self.std_bound[0], self.std_bound[1])
var = std**2
log_policy_pdf = -0.5 * (action - mu) ** 2 / var - 0.5 * tf.math.log(var * 2 * np.pi)
return tf.reduce_sum(log_policy_pdf, 1, keepdims=True)
def get_action(self, state):
mu, std = self.actor(np.reshape(state, [1, self.state_size]))
mu = mu[0]
std = std[0]
std = tf.clip_by_value(std, self.std_bound[0], self.std_bound[1])
action = np.random.normal(mu, std, size=self.action_size)
return action
def train_actor(self, action, state, advantage):
with tf.GradientTape() as t:
mu_a, std_a = self.actor(state)
log_policy_pdf = self.log_pdf(mu_a, std_a, action)
loss = -K.sum(log_policy_pdf * advantage)
g_theta = t.gradient(loss, self.actor.trainable_weights)
grads = zip(g_theta, self.actor.trainable_weights)
grads = [(tf.clip_by_value(grad, -self.grad_bound, self.grad_bound), var) for grad, var in grads]
self.actor_optimizer.apply_gradients(grads)
def train_critic(self, state, target):
with tf.GradientTape() as t:
output = self.critic(state)
loss = K.mean(K.square(target - output))
g_omega = t.gradient(loss, self.critic.trainable_weights)
grads = zip(g_omega, self.critic.trainable_weights)
grads = [(tf.clip_by_value(grad, -self.grad_bound, self.grad_bound), var) for grad, var in grads]
self.critic_optimizer.apply_gradients(grads)
def train_model(self, state, action, reward, next_state, done):
value = self.critic(state)[0]
next_value = self.critic(next_state)[0]
advantage = reward - value + (1 - done)*(self.discount_factor * next_value)
target = reward + (1 - done)*(self.discount_factor * next_value)
self.train_actor(action, state, advantage)
self.train_critic(state, target)
class Env():
def __init__(self,n_state,n_action):
self.observation_space = (n_state)
self.action_space = (n_action)
self.action_bound = (1)
self.Mint = np.random.rand(2,2)
self.Mcom = np.linalg.pinv(self.Mint)
def reset(self):
self.state = np.zeros(self.observation_space)
self.action = np.zeros(self.action_space)
self.gain = 0
self.reward = 0
self.u = np.zeros((2,1))
self.y = np.zeros((2,1))
self.previous_y = np.zeros((2,1))
self.previous_u = np.zeros((2,1))
return self.state
def step(self, action, k):
self.gain = action
self.gain = max(0,min(1,self.gain))
self.previous_y = self.y
self.previous_u = self.u
#self.y = np.ones((2,1)) * np.sin(1/20*k) + self.Mint @ self.u
self.y = np.random.randn(2,1) + self.Mint @ self.u
# integrator action
self.u = leaky_gain * self.u - self.gain * self.Mcom @ self.y
# set the reward
self.reward = - np.linalg.norm(self.y)
state = np.concatenate((self.y, self.previous_y))
return state, self.reward
Main loop:
rand_seed = 1
np.random.seed(rand_seed)
leaky_gain = 0.99
n_temporal_samples = 400
state_size = 4
action_size = 1
next_state = np.zeros((state_size,1))
state = np.zeros((state_size,1))
env = Env(state_size,action_size)
agent = A2CPIDTunner(state_size, action_size)
y_out = np.zeros(n_temporal_samples)
gain_out = np.zeros(n_temporal_samples)
action = 0
reward = 0
done = False
for i in range(10):
state = env.reset()
for k in range(n_temporal_samples):
agent.train_model(state.reshape(1,state_size), action, reward, next_state.reshape(1,state_size), done)
next_state, reward = env.step(action, k)
state = next_state
if k == n_temporal_samples-1:
action = agent.get_action(state)
done = True
else:
done = False
# for plotting only
y_out[k] = reward
gain_out[k] = action
fig = go.Figure()
fig.add_trace(go.Scatter(y=y_out))
fig.show()
fig = go.Figure()
fig.add_trace(go.Scatter(y=gain_out))
fig.add_trace(go.Scatter(y=np.clip(gain_out,0,1)))
fig.show()
print('residual norm y',np.linalg.norm(y_out))
print('mean value for integrator gain',np.mean(np.clip(gain_out,0,1)))
The perturbation coming in the system is noise, although it could be a sine wave.
I have doubts regarding choosing all the parameters, as well as the actor critic implementation.