utils#

Utility functions for reinforcement learning agents that are used across different algorithms.

Functions#

clamp_actions

Clamps actions to be within min and max bounds, handling scalar, list, or tensor bounds.

gaussian_noise

Generates Gaussian noise with specified mean and standard deviation.

generalized_advantage_estimates

Generalized Advantage Estimation (GAE) computes an advantage estimation that balances bias and variance.

hard_update

Updates a target network with the parameters of the proided network.

normalize_advantages

Normalizes advantages to have zero mean and unit variance.

ornstein_uhlenbeck_noise

Generates Ornstein-Uhlenbeck noise for exploration in continuous action spaces.

polyak_update

Updates a target network using Polyak averaging.

rewards_to_go

Computes the discounted rewards-to-go returns for a batch of trajectories.

set_seed

to_activation

Converts a string name to a PyTorch activation module.

trajectory_returns

Computes the discounted returns for a sequence of rewards, also known as total discounted return.

prt_rl.common.utils.clamp_actions(actions: Tensor, action_min: float | List[float] | Tensor, action_max: float | List[float] | Tensor) Tensor[source]#

Clamps actions to be within min and max bounds, handling scalar, list, or tensor bounds.

Parameters:
Returns:

Clamped actions of shape (B, A)

Return type:

torch.Tensor

prt_rl.common.utils.gaussian_noise(mean: float = 0.0, std: float = 1.0, shape: Tuple[int, ...] = (1,), device: str = 'cpu') Tensor[source]#

Generates Gaussian noise with specified mean and standard deviation.

Parameters:
  • mean (float) – Mean of the Gaussian distribution.

  • std (float) – Standard deviation of the Gaussian distribution.

  • shape (Tuple[int, ...]) – Shape of the output tensor.

Returns:

A tensor filled with Gaussian noise.

Return type:

torch.Tensor

prt_rl.common.utils.generalized_advantage_estimates(rewards: Tensor, values: Tensor, dones: Tensor, last_values: Tensor, gamma: float = 0.99, gae_lambda: float = 0.95) Tuple[Tensor, Tensor][source]#

Generalized Advantage Estimation (GAE) computes an advantage estimation that balances bias and variance.

The GAE is defined as:

\[A_t = \sum_{t'=t}^{\infty} (\gamma \lambda)^{t'-t} \delta_{t'}\]

where \(\delta_{t'} = r_t + \gamma V(s_{t+1}) - V(s_t)\).

When lambda is set to 1, this reduces to the Monte Carlo estimate of the advantage. When lambda is set to 0, it reduces to the one-step TD error.

Parameters:
  • rewards (torch.Tensor) – Rewards from rollout with shape (T, N, 1) or (B, 1)

  • values (torch.Tensor) – Estimated state values with shape (T, N, 1) or (B, 1)

  • dones (torch.Tensor) – Done flags (1 if episode ended at step t, else 0) with shape (T, N, 1) or (B, 1)

  • last_values (torch.Tensor) – Value estimates for final state (bootstrap) with shape (N, 1)

  • gamma (float) – Discount factor

  • gae_lambda (float) – GAE lambda

Returns:

  • Estimated advantages with shape matching rewards shape

  • TD(lambda) returns with shape matching rewards shape

Return type:

Tuple[torch.Tensor, torch.Tensor]

prt_rl.common.utils.hard_update(target: Module, network: Module) None[source]#

Updates a target network with the parameters of the proided network.

This is a hard update where the parameters are directly copied from the network to the target. The parameters of the target network are updated in place.

\[\Theta_{target} = \Theta_{\pi}\]
Parameters:
prt_rl.common.utils.normalize_advantages(advantages: Tensor) Tensor[source]#

Normalizes advantages to have zero mean and unit variance.

\[A_{norm} =\]

rac{A - mu}{sigma + epsilon}

Args:

advantages (torch.Tensor): The advantages to normalize. Shape (B, 1)

Returns:

torch.Tensor: The normalized advantages. Shape (B, 1)

prt_rl.common.utils.ornstein_uhlenbeck_noise(mean: float = 0.0, std: float = 1.0, shape: Tuple[int, ...] = (1,), theta: float = 0.15, dt: float = 0.01, x0: Tensor | None = None) Tensor[source]#

Generates Ornstein-Uhlenbeck noise for exploration in continuous action spaces.

Orstein-Uhlenbeck noise is a stochastic process that modelled the velocity of a massive Brownian particle under the influence of friction. It is defined by the following stochastic differential equation:

\[dx_t = heta (\mu - x_t) dt + \sigma dW_t\]

where \(\mu\) is the mean, \(\sigma\) is the standard deviation, and \(dW_t\) is a Wiener process.

This implementation uses the Euler-Maruyama method to discretize and approximate the process following the equation:

\[x_{t+1} = x_t + heta (\mu - x_t) dt + \sigma \delta W_t\]

where \(\delta W_t \sim \mathcal{N}(0, \delta t) = \sqrt{\delta t}\mathcal{N}(0, 1)\).

Parameters:
  • mean (float) – Mean of the noise.

  • std (float) – Standard deviation of the noise.

  • shape (Tuple[int, ...]) – Shape of the output tensor. Supports (B, action_dim)

  • theta (float) – Rate of mean reversion.

  • dt (float) – Time step size.

  • x0 (Optional[torch.Tensor]) – Initial value or previous value for the noise process. (if None, it will be initialized to zeros)

Returns:

A tensor filled with Ornstein-Uhlenbeck noise.

Return type:

torch.Tensor

References

[1] http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab [2] https://en.wikipedia.org/wiki/Ornstein%E2%80%93Uhlenbeck_process [3] https://en.wikipedia.org/wiki/Euler%E2%80%93Maruyama_method

prt_rl.common.utils.polyak_update(target: Module, network: Module, tau: float) None[source]#

Updates a target network using Polyak averaging.

When tau is 0 the target is unchanged and when tau is 1 a hard update is performed. The parameters of the target network are updated in place.

\[\Theta_{target} = au * \Theta_{\pi} + (1 - au) * \Theta_{target}\]
Parameters:
  • target (torch.nn.Module) – The target network to be updated.

  • network (torch.nn.Module) – The policy, pi, network from which parameters are taken.

  • tau (float) – The interpolation factor, typically in the range [0, 1].

References: [1] DLR-RM/stable-baselines3#93

prt_rl.common.utils.rewards_to_go(rewards: Tensor, dones: Tensor, last_values: Tensor | None = None, gamma: float = 0.99) Tensor[source]#

Computes the discounted rewards-to-go returns for a batch of trajectories. This function supports bootstrapping partial trajectories, as well as, flattened or time-major inputs.

The bootstrapped discounted rewards-to-go is defined as:

\[G_t = \sum_{t'=t}^{T-1} \gamma^{t'-t} r(s_{i,t'}, a_{i,t'}) + \gamma^{T-t} V(s_{i,T})\]

where \(r(s_{i,t'}, a_{i,t'})\) is the reward at time step \(t'\).

Parameters:
  • rewards (torch.Tensor) – Rewards from rollout with shape (T, N, 1) or (B, 1)

  • dones (torch.Tensor) – Done flags (1 if episode ended at step t, else 0) with shape (T, N, 1) or (B, 1)

  • last_values (Optional[torch.Tensor]) – Value estimates for final state (bootstrap) with shape (N, 1) or (1, 1). This is required if the last state is not terminal or 0 is assumed for the last value.

  • gamma (float) – Discount factor

Returns:

The rewards-to-go with shape that matches the input rewards shape.

Return type:

torch.Tensor

prt_rl.common.utils.set_seed(seed: int)[source]#
prt_rl.common.utils.to_activation(name: str) Module[source]#

Converts a string name to a PyTorch activation module.

Parameters:

name (str) – Name of the activation function. Supported: “relu”, “tanh”.

Returns:

Corresponding PyTorch activation module.

Return type:

nn.Module

Raises:

ValueError – If the activation name is unknown.

prt_rl.common.utils.trajectory_returns(rewards: Tensor, dones: Tensor, last_values: Tensor | None = None, gamma: float = 0.99) Tensor[source]#

Computes the discounted returns for a sequence of rewards, also known as total discounted return. This function supports bootstrapping partial trajectories, as well as, flattened or time-major inputs.

..math ::

sum_{t’=0}^{T-1}gamma^{t’}r(s_{i,t’},a_{i,t’}) + gamma^{T}V(s_{i,T})

When arguments are passed in with shape (B, 1) it is assumed these are stacked trajectories. The assumption is only the last trajectory is potentially not complete.

Parameters:
  • rewards (torch.Tensor) – Rewards from rollout with shape (T, N, 1) or (B, 1)

  • dones (torch.Tensor) – Done flags (1 if episode ended at step t, else 0) with shape (T, N, 1) or (B, 1)

  • last_values (Optional[torch.Tensor]) – Value estimates for final state (bootstrap) with shape (N, 1) or (1, 1). This is required if the last state is not terminal or 0 is assumed for the last value.

  • gamma (float) – Discount factor

Returns:

The returns with shape that matches the input rewards shape.

Return type:

torch.Tensor