Source code for blackbox_mpc.trajectory_evaluators.deterministic

import tensorflow as tf
from blackbox_mpc.trajectory_evaluators.evaluator_base import EvaluatorBase


[docs]class DeterministicTrajectoryEvaluator(EvaluatorBase):
[docs] def __init__(self, reward_function, system_dynamics_handler): """ This is the trajectory evaluator class for a deterministic dynamics function Parameters --------- reward_function: tf_function Defines the reward function with the prototype: tf_func_name(current_state, current_actions, next_state), where current_state is BatchXdim_S, next_state is BatchXdim_S and current_actions is BatchXdim_U. system_dynamics_handler: SystemDynamicsHandler Defines the system dynamics handler class with its own trainer and observations and actions preprocessing functions. """ super(DeterministicTrajectoryEvaluator, self).__init__( reward_function=reward_function, system_dynamics_handler=system_dynamics_handler, name=None) return
[docs] @tf.function def __call__(self, current_states, action_sequences, time_step): """ This is the call function for the Deterministic Trajectory Evaluator Class. It is used to calculate the rewards corresponding to each of the action sequences starting from the current state. Parameters --------- current_states: tf.float32 Defines the current state of the system, (dims=num_of_agents X dim_S) action_sequences: tf.float32 Defines the action sequences to be evaluated, (dims = population X num_of_agents X planning_horizon X dim_U) time_step: tf.float32 Defines the current timestep of the episode. Returns ------- rewards: tf.float32 The rewards corresponding to each action sequence (dims = 1 X population) """ init_t = tf.constant(0, dtype=tf.int32) nopt = tf.shape(action_sequences)[0] n_agents = tf.shape(action_sequences)[1] planning_horizon = tf.shape(action_sequences)[2] init_rewards = tf.zeros([nopt*n_agents], dtype=tf.float32) action_sequences = tf.reshape(action_sequences, [-1, planning_horizon, tf.shape(action_sequences)[3]]) action_sequences = tf.transpose(action_sequences, [1, 0, 2]) init_states = tf.tile(current_states, [nopt, 1]) def continue_prediction(t, total_reward, current_state): return tf.less(t, planning_horizon) def iterate(t, total_reward, current_state): current_actions = action_sequences[t] next_state = self.predict_next_state(current_state, current_actions) delta_reward = self._reward_function(current_state, current_actions, next_state) return t + tf.constant(1, dtype=tf.int32), \ total_reward + delta_reward, next_state _, rewards, _ = tf.while_loop( cond=continue_prediction, body=iterate, loop_vars=[init_t, init_rewards, init_states] ) rewards = tf.reshape(rewards, [nopt, n_agents]) return tf.where(tf.math.is_nan(rewards), tf.constant(-1e6, dtype=tf.float32) * tf.ones_like(rewards), rewards)
[docs] @tf.function def predict_next_state(self, current_states, current_actions): """ This is the function used to predict the next state using the internal dynamics handler. Parameters --------- current_states: tf.float32 Defines the current state of the system, (dims=num_of_agents X dim_S) current_actions: tf.float32 Defines the current action to be applied, (dims = num_of_agents X dim_U) Returns ------- next_state: tf.float32 Defines the next state of the system, (dims=num_of_agents X dim_S) """ sys_model_inputs = self._system_dynamics_handler.process_input( current_states, current_actions) raw_next_states = self._system_dynamics_handler._dynamics_function( sys_model_inputs, train=tf.constant(False, dtype=tf.bool)) next_states = self._system_dynamics_handler.process_output( current_states, raw_next_states) return next_states
[docs] def evaluate_next_reward(self, current_states, next_states, current_actions): """ This is the function used to predict the next reward using the internal dynamics handler. Parameters --------- current_states: tf.float32 Defines the current state of the system, (dims=num_of_agents X dim_S) next_states: tf.float32 Defines the next state of the system, (dims=num_of_agents X dim_S) current_actions: tf.float32 Defines the current action to be applied, (dims = num_of_agents X dim_U) Returns ------- reward: tf.float32 returns the predicted reward using the action, current state and the next one, (dims=num_of_agents X 1) """ return self._reward_function(current_states, current_actions, next_states)