Source code for blackbox_mpc.optimizers.cem

import tensorflow as tf
import numpy as np
from blackbox_mpc.optimizers.optimizer_base import OptimizerBase


[docs]class CEMOptimizer(OptimizerBase):
[docs] def __init__(self, env_action_space, env_observation_space, planning_horizon=50, max_iterations=5, population_size=500, num_elite=50, num_agents=5, epsilon=0.001, alpha=0.25): """ This Class defines a Cross-Entropy Method optimizer. (http://web.mit.edu/6.454/www/www_fall_2003/gew/CEtutorial.pdf) Parameters --------- env_action_space: gym.ActionSpace Defines the action space of the gym environment. env_observation_space: gym.ObservationSpace Defines the observation space of the gym environment. planning_horizon: Int Defines the planning horizon for the optimizer (how many steps to lookahead and optimize for). max_iterations: tf.int32 Defines the maximimum iterations for the CEM optimizer to refine its guess for the optimal solution. population_size: tf.int32 Defines the population size of the particles evaluated at each iteration. num_elite: tf.int32 Defines the number of elites kept for the next iteration from the population. num_agents: tf.int32 Defines the number of runner running in parallel epsilon: tf.float32 Defines the epsilon threshold for the difference between iterations solutions so that the optimizer returns the solution earlier than max iterations. alpha: tf.float32 Defines the weight of the solution at t-1 in determining the solution at t, ex: mean = alpha*old_mean + (1-alpha)*new_mean. """ super(CEMOptimizer, self).__init__(name=None, planning_horizon=planning_horizon, max_iterations=max_iterations, num_agents=num_agents, env_action_space=env_action_space, env_observation_space= env_observation_space) self._solution_dim = [self._num_agents, self._planning_horizon, self._dim_U] self._elites_dim = [int(self._num_agents), int(num_elite), int(self._planning_horizon), int(self._dim_U)] self._population_size = population_size self._num_elite = num_elite self._epsilon = epsilon self._alpha = alpha previous_solution_values = np.tile((self._action_lower_bound + self._action_upper_bound) / 2, [self._planning_horizon * self._num_agents, 1]) previous_solution_values = previous_solution_values.reshape( [self._num_agents, self._planning_horizon, -1]) self._previous_solution = tf.Variable( tf.zeros(shape=previous_solution_values.shape, dtype=tf.float32)) self._previous_solution.assign(previous_solution_values) solution_variance_values = np.tile(np.square(self._action_lower_bound - self._action_upper_bound) / 16, [self._planning_horizon * self._num_agents, 1]) solution_variance_values = solution_variance_values.reshape( [self._num_agents, self._planning_horizon, -1]) self._solution_variance = tf.Variable(tf.zeros( shape=solution_variance_values.shape, dtype=tf.float32)) self._solution_variance.assign(solution_variance_values)
@tf.function def _optimize(self, current_state, time_step): def continue_condition(t, mean, variance): result = tf.less(t, self._max_iterations) return result def iterate(t, mean, variance): lower_bound_dist = mean - self._action_lower_bound_horizon upper_bound_dist = self._action_upper_bound_horizon - mean constrained_variance = tf.minimum(tf.minimum( tf.square(lower_bound_dist / tf.constant(2, dtype=tf.float32)), tf.square(upper_bound_dist / tf.constant(2, dtype=tf.float32))), variance) #_sample population samples = tf.random.truncated_normal([self._population_size, *self._solution_dim], mean, tf.sqrt(constrained_variance), dtype=tf.float32) rewards = self._trajectory_evaluator(current_state, samples, time_step) rewards = tf.transpose(rewards, [1, 0]) values, indices = tf.nn.top_k(rewards, k=self._num_elite, sorted=True) samples = tf.transpose(samples, [1, 0, 2, 3]) elites = tf.zeros(self._elites_dim, dtype=tf.float32) for agent in tf.range(self._num_agents): elites = tf.concat([elites[:agent], tf.expand_dims(tf.gather(samples[agent], indices[agent]), 0), tf.zeros([samples.shape[0] - 1 - agent, self._num_elite, *samples.shape[2:]], dtype=tf.float32)], axis=0) elites.set_shape(self._elites_dim) new_mean = tf.reduce_mean(elites, axis=1) new_variance = tf.reduce_mean(tf.square(elites - tf.tile( tf.expand_dims( new_mean, 1), [1, tf.shape(elites) [1], 1, 1])), axis=1) mean = self._alpha * mean + (tf.constant( 1, dtype=tf.float32) - self._alpha) * new_mean variance = self._alpha * variance + ( tf.constant(1, dtype=tf.float32) - self._alpha) * \ new_variance return t + tf.constant(1, dtype=tf.int32), mean, variance num_optimization_iters, mean, variance = tf.while_loop( cond=continue_condition, body=iterate, loop_vars=[tf.constant(0, dtype=tf.int32), self._previous_solution, self._solution_variance]) #TODO: the below line is causing problems with cheetah env # self.previous_solution.assign(mean) resulting_action = mean[:, 0] return resulting_action
[docs] def reset(self): """ This method resets the optimizer to its default state at the beginning of the trajectory/episode. """ previous_solution_values = np.tile((self._action_lower_bound + self._action_upper_bound) / 2, [self._planning_horizon * self._num_agents, 1]) previous_solution_values = \ previous_solution_values.reshape([self._num_agents, self._planning_horizon, -1]) self._previous_solution.assign(previous_solution_values)