From a09aab61f309a9b3d36efb04c692c187c51287d7 Mon Sep 17 00:00:00 2001 From: ikkamens Date: Thu, 11 Feb 2021 20:11:15 +0100 Subject: [PATCH] Adds comments and some typing for readability --- pilco/models/mgpr.py | 9 +++++++-- pilco/models/pilco.py | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pilco/models/mgpr.py b/pilco/models/mgpr.py index a13f34e..0bd2ea5 100644 --- a/pilco/models/mgpr.py +++ b/pilco/models/mgpr.py @@ -1,3 +1,5 @@ +from typing import Tuple + import tensorflow as tf from tensorflow_probability import distributions as tfd import gpflow @@ -5,7 +7,7 @@ import numpy as np float_type = gpflow.config.default_float() -def randomize(model, mean=1, sigma=0.01): +def randomize(model: gpflow.models.GPR, mean=1, sigma=0.01): model.kernel.lengthscales.assign( mean + sigma*np.random.normal(size=model.kernel.lengthscales.shape)) model.kernel.variance.assign( @@ -15,6 +17,7 @@ def randomize(model, mean=1, sigma=0.01): mean + sigma*np.random.normal()) class MGPR(gpflow.Module): + """Multivariate Gaussian Process Regression""" def __init__(self, data, name=None): super(MGPR, self).__init__(name) @@ -35,7 +38,7 @@ def create_models(self, data): self.models.append(gpflow.models.GPR((data[0], data[1][:, i:i+1]), kernel=kern)) self.models[-1].likelihood.prior = tfd.Gamma(to_default_float(1.2), to_default_float(1/0.05)) - def set_data(self, data): + def set_data(self, data: Tuple): for i in range(len(self.models)): if isinstance(self.models[i].data[0], gpflow.Parameter): self.models[i].X.assign(data[0]) @@ -75,10 +78,12 @@ def optimize(self, restarts=1): model.likelihood.variance.assign(best_params["l_variance"]) def predict_on_noisy_inputs(self, m, s): + """Apply the learned model of the environment to predict the change of the state.""" iK, beta = self.calculate_factorizations() return self.predict_given_factorizations(m, s, iK, beta) def calculate_factorizations(self): + """TODO document me""" K = self.K(self.X) batched_eye = tf.eye(tf.shape(self.X)[0], batch_shape=[self.num_outputs], dtype=float_type) L = tf.linalg.cholesky(K + self.noise[:, None, None]*batched_eye) diff --git a/pilco/models/pilco.py b/pilco/models/pilco.py index b09e7cb..39145ab 100644 --- a/pilco/models/pilco.py +++ b/pilco/models/pilco.py @@ -1,3 +1,5 @@ +from typing import Tuple + import numpy as np import tensorflow as tf import gpflow @@ -13,7 +15,7 @@ from gpflow import set_trainable class PILCO(gpflow.models.BayesianModel): - def __init__(self, data, num_induced_points=None, horizon=30, controller=None, + def __init__(self, data: Tuple, num_induced_points=None, horizon=30, controller=None, reward=None, m_init=None, S_init=None, name=None): super(PILCO, self).__init__(name) if num_induced_points is None: @@ -113,9 +115,19 @@ def optimize_policy(self, maxiter=50, restarts=1): set_trainable(param, True) def compute_action(self, x_m): + """Computes action for a real interaction with environment. + + Unlike in approximate inference, we don't have any uncertainty about the state, + therefore covariance is set to zero.""" + return self.controller.compute_action(x_m, tf.zeros([self.state_dim, self.state_dim], float_type))[0] def predict(self, m_x, s_x, n): + """Do approximate inference for n time steps into the future. + + Returns the distribution over the state after n steps (mean and sigma), + and the total reward.""" + loop_vars = [ tf.constant(0, tf.int32), m_x, @@ -138,6 +150,7 @@ def predict(self, m_x, s_x, n): def propagate(self, m_x, s_x): m_u, s_u, c_xu = self.controller.compute_action(m_x, s_x) + # find mean, sigma for the concatenated vector of state and action (x~ in the paper) m = tf.concat([m_x, m_u], axis=1) s1 = tf.concat([s_x, s_x@c_xu], axis=1) s2 = tf.concat([tf.transpose(s_x@c_xu), s_u], axis=1)