Source code for blackbox_mpc.utils.rollouts

import logging
logging.getLogger().setLevel(logging.INFO)
from blackbox_mpc.policies.model_free_base_policy import ModelFreeBasePolicy
import time
import tensorflow as tf
import numpy as np
from blackbox_mpc.policies.random_policy import RandomPolicy


[docs]def perform_rollouts(env, number_of_rollouts, task_horizon, policy, exploration_noise=False, tf_writer=None, start_episode=0): """ This is the perform_rollouts function for the runner class which samples n episodes with a specified length using the provided policy. Parameters --------- env: parallelgymEnv a wrapped gym environment using blackbox.environment_utils.EnvironmentWrapper funcs number_of_rollouts: Int Number of rollouts/ episodes to perform for each of the agents in the vectorized environment. task_horizon: Int The task horizon/ episode length. policy: ModelBasedBasePolicy or ModelFreeBasePolicy The policy to be used in collecting the episodes from the different agents. exploration_noise: bool If noise should be added to the actions to help in exploration. tf_writer: tf.summary Tensorflow writer to be used in logging the data. start_episode: Int the episode index for tensorflow logging purposes Returns ------- traj_obs: [np.float32] List with length=number_of_rollouts which holds the observations starting from the reset observations. traj_acs: [np.float32] List with length=number_of_rollouts which holds the actions taken by the policy. traj_rews: [np.float32] List with length=number_of_rollouts which holds the rewards taken by the policy. """ traj_obs, traj_acs, traj_rews = [], [], [] samples = [] logging.info("Started collecting samples for rollouts") for i in range(number_of_rollouts): samples.append( _sample( env, task_horizon, policy, exploration_noise=exploration_noise, tf_writer=tf_writer, episode_step=start_episode+i)) traj_obs.append(samples[-1]["observations"]) traj_acs.append(samples[-1]["actions"]) traj_rews.append(samples[-1]["rewards"]) logging.info("Finished collecting samples for rollout") return traj_obs, traj_acs, traj_rews
def _sample(env, horizon, policy, episode_step, exploration_noise=False, tf_writer=None): """ This is the sampling function for the runner class which samples one episode with a specified length using the provided policy. Parameters --------- env: parallelgymEnv a wrapped gym environment using blackbox.environment_utils.EnvironmentWrapper funcs horizon: Int The task horizon/ episode length. policy: ModelBasedBasePolicy or ModelFreeBasePolicy The policy to be used in collecting the episodes from the different agents. exploration_noise: bool If noise should be added to the actions to help in exploration. Returns ------- result: dict returns the episode rollouts results for all the agents in the parallelized environment, it has the form of {observations, actions, rewards, reward_sum} """ policy.reset() first_obs = env.reset() times, observations, actions, rewards, reward_sum, done = \ [], [first_obs], [], [], 0, False if not isinstance(policy, ModelFreeBasePolicy): predicted_reward = 0 for t in range(horizon): start = time.time() if not isinstance(policy, ModelFreeBasePolicy): action_to_execute, expected_obs, expected_reward = \ policy.act(observations[t], t, exploration_noise) predicted_reward += expected_reward else: action_to_execute = policy.act(observations[t], t) action_to_execute = action_to_execute.numpy() actions.append(action_to_execute) times.append(time.time() - start) obs, reward, done, info = env.step(actions[t]) if tf_writer is not None: if not isinstance(policy, RandomPolicy): with tf_writer.as_default(): tf.summary.scalar('rewards/actual_reward', np.mean(reward), step=(episode_step*horizon)+t) if not isinstance(policy, ModelFreeBasePolicy): with tf_writer.as_default(): tf.summary.scalar('states/predicted_observations_abs_error', np.mean(np.sum(np.abs(expected_obs - obs), axis=1)), step=(episode_step*horizon)+t) tf.summary.scalar('rewards/predicted_reward_abs_error', np.mean(np.abs(expected_reward - reward)), step=(episode_step * horizon) + t) observations.append(obs) rewards.append(reward) reward_sum += reward if t >= horizon - 1: if tf_writer is not None: if not isinstance(policy, RandomPolicy): with tf_writer.as_default(): tf.summary.scalar('rewards/actual_episode_reward', np.mean(reward_sum), step=episode_step) if not isinstance(policy, ModelFreeBasePolicy): with tf_writer.as_default(): tf.summary.scalar('rewards/predicted_episode_reward', np.mean(predicted_reward), step=episode_step) break logging.info("Average action selection time: " + str(np.mean(times))) logging.info("Rollout length: " + str(len(actions))) return {"observations": np.array(observations), "actions": np.array(actions), "rewards": np.array(rewards), "reward_sum": reward_sum}