import tensorflow as tf
import numpy as np
[docs]class OptimizerBase(tf.Module):
[docs] def __init__(self, name, planning_horizon, max_iterations, num_agents,
env_action_space, env_observation_space):
"""
This is the base class of the optimizers
Parameters
---------
name: String
Defines the name of the block of the optimizer.
planning_horizon: Int
Defines the planning horizon for the optimizer (how many steps to lookahead and optimize for).
max_iterations: tf.int32
Defines the maximimum iterations for the CEM optimizer to refine its guess for the optimal solution.
num_agents: tf.int32
Defines the number of runner running in parallel
env_action_space: gym.ActionSpace
Defines the action space of the gym environment.
env_observation_space: tf.int32
Defines the observation space of the gym environment.
"""
super(OptimizerBase, self).__init__(name=name)
self._planning_horizon = planning_horizon
self._env_action_space = env_action_space
self._env_observation_space = env_observation_space
self._dim_U = tf.constant(env_action_space.shape[0], dtype=tf.int32)
self._dim_S = tf.constant(env_observation_space.shape[0], dtype=tf.int32)
self._action_upper_bound = tf.constant(env_action_space.high,
dtype=tf.float32)
self._action_lower_bound = tf.constant(env_action_space.low,
dtype=tf.float32)
self._action_upper_bound_horizon = tf.tile(
np.expand_dims(self._action_upper_bound, 0),
[self._planning_horizon, 1])
self._action_lower_bound_horizon = tf.tile(
np.expand_dims(self._action_lower_bound, 0),
[self._planning_horizon, 1])
self._num_agents = num_agents
self._max_iterations = max_iterations
self._trajectory_evaluator = None
self._exploration_variance = (np.square(self._action_lower_bound -
self._action_upper_bound) /
16) * 0.05
self._exploration_mean = (self._action_upper_bound +
self._action_lower_bound) / 2
def _optimize(self, current_state, time_step):
raise Exception("__call__ function is not implemented yet")
[docs] @tf.function
def __call__(self, current_state, time_step, add_exploration_noise):
"""
This is the call function for the Base Optimizer Class.
It is used to calculate the optimal solution for action at the current timestep given the current state.
Parameters
---------
current_state: tf.float32
Defines the current state of the system, (dims=num_of_agents X dim_S)
time_step: tf.float32
Defines the current timestep of the episode.
exploration_noise: tf.bool
Define if the optimal action should have some noise added to it before returning it.
Returns
-------
resulting_action: tf.float32
The optimal solution for the first action to be applied in the current time step.
next_state: tf.float32
The next state predicted using the dynamics model in the trajectory evaluator.
rewards_of_next_state: tf.float32
The predicted reward achieved after applying the action given by the optimizer.
"""
resulting_action = \
self._optimize(current_state, time_step)
if add_exploration_noise:
noise = tf.random.truncated_normal([self._num_agents, self._dim_U],
self._exploration_mean,
tf.sqrt(self._exploration_variance),
dtype=tf.float32)
resulting_action = resulting_action + noise
resulting_action = tf.clip_by_value(resulting_action,
self._action_lower_bound,
self._action_upper_bound)
next_state = self._trajectory_evaluator.predict_next_state(
current_state, resulting_action)
rewards_of_next_state = self._trajectory_evaluator.\
evaluate_next_reward(current_state, next_state, resulting_action)
return resulting_action, next_state, rewards_of_next_state
[docs] def reset(self):
"""
This method resets the optimizer to its default state at the
beginning of the trajectory/episode.
"""
raise Exception("reset function is not implemented yet")
[docs] def set_trajectory_evaluator(self, trajectory_evaluator):
"""
Sets the trajectory evaluator to be used by the optimizer.
:param trajectory_evaluator: (EvaluatorBaseClass) Defines the
trajectory evaluator to be used to evaluate the reward of a
sequence of actions.
:return:
"""
self._trajectory_evaluator = trajectory_evaluator
return