cross_entropy#

Functions#

prt_rl.model_based.planners.cross_entropy.temporal_smooth(x: Tensor, method: str = 'none', rho: float = 0.9, kernel_size: int = 0) Tensor[source]#

Apply simple temporal smoothing along the horizon dimension.

Parameters:
  • x – torch.Tensor Tensor of shape (N, H, dA) where: - N: number of sequences/samples - H: planning horizon - dA: action dimension

  • method

    {“none”, “ou”, “conv”}, default: “none” - “none”: return x unchanged. - “ou”: Exponential moving average (Ornstein–Uhlenbeck-like) smoothing:

    out[:, t] = rho * out[:, t-1] + (1 - rho) * x[:, t]

    • ”conv”: 1D convolution with a Gaussian-ish kernel of length kernel_size.

  • rho – float, default: 0.9 Smoothing factor for “ou”. Higher means smoother (more inertia).

  • kernel_size – int, default: 0 Kernel length for “conv”. Must be >= 3 to have an effect.

Returns:

torch.Tensor

Smoothed tensor with the same shape as x (N, H, dA).

Notes

  • Smoothing should be applied in U-space for tanh bound mode (preferred),

and in A-space for clip mode. - For “conv”, edges are handled with ‘replicate’ padding.

Classes#

ClipBound

Hard clipping strategy (simple & useful for bang-bang optima).

CrossEntropyMethodPlanner

Cross-Entropy Method (CEM) planner for continuous control with support for tanh-squash (U-space) and clip (A-space) bounding strategies.

TanhSquashBound

Tanh squashing strategy (recommended default).

class prt_rl.model_based.planners.cross_entropy.ClipBound[source]#

Hard clipping strategy (simple & useful for bang-bang optima).

The distribution lives and is refit in A-space. Sampling:

A ~ Normal(mu_a, sigma_a) (shape (H, dA)) A := clamp(A, [a_min, a_max])

Shapes#

  • a_mins, a_maxs : (dA, 1) or (dA,)

  • distribution.loc/scale : (H, dA)

  • samples / elites : (N, H, dA)

static cold_start(H: int, a_mins: Tensor, a_maxs: Tensor, beta: float, tau: float, std_min: float = 1e-06) Normal[source]#

Initialize an A-space Normal with center-of-box mean and decayed half-span std.

sigma_a(t) = (beta + (1 - beta) * exp(-t/tau)) * (a_max - a_min)/2

Returns:

A-space Normal (H, dA).

Return type:

Normal

static sample(distribution: Normal, shape: Size, a_mins: Tensor, a_maxs: Tensor, smoothing: str = 'ou', rho: float = 0.5, kernel_size: int = 0) Tensor[source]#

Sample actions from an A-space Normal, optionally smooth in A, then clamp to bounds.

Returns:

A-space actions with shape (N, H, dA), in-bounds if clamp=True.

Return type:

torch.Tensor

static warm_start(elites: Tensor, a_mins: Tensor, a_maxs: Tensor, widening_factor=1.5, std_min=1e-06) Normal[source]#

Warm-start the A-space Normal from previous elites.

Steps#

  1. Compute mean/std across elites.

  2. Shift mu_a, sigma_a forward by one time-step.

  3. Tail: - “repeat”: use last mean/std - “center”: set last mean to center of box (keeps last std)

  4. Widen sigma_a[0] by widening_factor.

  5. (Optional) Anchor mu_a[0] toward last executed action with convex blend.

param elites:

(N_e, H, dA) Elite A-space action sequences from previous iteration.

param a_mins:

(dA,1) or (dA,) Bounds are not used. These are part of the interface but not required.

param a_maxs:

(dA,1) or (dA,) Bounds are not used. These are part of the interface but not required.

param widening_factor:

float, default: 1.5 Multiplier for sigma_a at t=0 after shift.

param std_min:

float, default: 1e-6 Std floor.

returns:
Normal

Warm-started A-space Normal (H, dA).

class prt_rl.model_based.planners.cross_entropy.CrossEntropyMethodPlanner(action_mins: Tensor, action_maxs: Tensor, num_action_sequences: int = 100, planning_horizon: int = 10, num_elites: int = 10, num_iterations: int = 5, use_smoothing: bool = False, use_clipping: bool = False, tau: float | None = None, beta: float = 0.2, device: str = 'cpu')[source]#

Cross-Entropy Method (CEM) planner for continuous control with support for tanh-squash (U-space) and clip (A-space) bounding strategies.

Workflow per planning call#

  1. Initialize or warm-start the sequence distribution (shape (H, dA)).

  2. Repeat for K iterations: a) Sample N sequences (N,H,dA) using the bound strategy. b) Roll out through the (known or learned) dynamics model. c) Compute reward for each sequence, pick top M (elites). d) Refit the distribution from elites (in the proper space).

  3. Return the first action of the best-scoring elite.

param action_mins:

Action bounds with shape (dA, 1) (or (dA,)). Broadcasted internally.

type action_mins:

torch.Tensor

param action_maxs:

Action bounds with shape (dA, 1) (or (dA,)). Broadcasted internally.

type action_maxs:

torch.Tensor

param num_action_sequences:

N, number of sequences sampled per iteration.

type num_action_sequences:

int, default: 100

param planning_horizon:

H, number of steps in each sequence.

type planning_horizon:

int, default: 10

param num_elites:

M, number of top sequences used for refit.

type num_elites:

int, default: 10

param num_iterations:

K, number of CEM refinement iterations per plan call.

type num_iterations:

int, default: 5

param use_smoothing:

If True, apply temporal smoothing (OU) inside the bound strategy (U-space for tanh, A-space for clip).

type use_smoothing:

bool, default: False

param use_clipping:

If True, use ClipBound; otherwise use TanhSquashBound.

type use_clipping:

bool, default: False

param tau:

Time constant for std decay schedule.

type tau:

float or None, default: H/3

param beta:

Long-horizon std floor fraction for the decay schedule.

type beta:

float, default: 0.2

param device:

Device for internal tensors.

type device:

{“cpu”,”cuda”,…}, default: “cpu”

Notes

  • This implementation assumes higher reward is better. If you use costs, either flip the sign or use largest=False in topk.

  • rollout_action_sequence(model_config, model_fcn, state, actions) must return a dict with ‘state’, ‘action’, and ‘next_state’ batches consistent with shapes (N, H, ·).

plan(model_fcn: Callable, model_config: Any, reward_fcn: Callable, state: Tensor) Tensor[source]#

Run one CEM planning call and return the first action to execute.

Parameters:
  • model_fcn (Callable) – One-step dynamics function (batched) used by the rollout utility.

  • model_config (Any) – Additional config passed to your rollout helper.

  • reward_fcn (Callable) – Function computing rewards from rollout dict; returns (N,) reward per sequence.

  • state (torch.Tensor) – Current state (batching left to caller/rollout helper).

Returns:

First action of the best elite sequence, shape (1, dA).

Return type:

torch.Tensor

Notes

  • Sampling returns A-space actions in both strategies.

  • Refit is done in U or A space depending on the bound strategy.

class prt_rl.model_based.planners.cross_entropy.TanhSquashBound[source]#

Tanh squashing strategy (recommended default).

The underlying distribution lives in U-space (unbounded). Sampling:

U ~ Normal(mu_u, sigma_u) (shape (H, dA)) A = (tanh(U) + 1)/2 * (a_max - a_min) + a_min (shape (N, H, dA), in bounds)

Refit must be done in U-space. We therefore convert A-space elites back to U-space via an atanh-like transform and compute the new Normal in U.

All methods here are stateless utilities; the planner owns the current distribution.

Shapes#

  • a_mins, a_maxs : (dA, 1) or (dA,)

  • distribution.loc/scale : (H, dA)

  • samples / elites : (N, H, dA)

static cold_start(H: int, a_mins: Tensor, a_maxs: Tensor, beta: float, tau: float, sigma_u0: float = 0.6, std_min: float = 1e-06) Normal[source]#

Initialize the U-space Normal used for tanh squashing.

Parameters:
  • H (int) – Planning horizon.

  • a_mins (torch.Tensor) – Bounds with shape (dA,1) or (dA,). Used only for dtype/device; U-space init is centered at zero regardless of bounds.

  • a_maxs (torch.Tensor) – Bounds with shape (dA,1) or (dA,). Used only for dtype/device; U-space init is centered at zero regardless of bounds.

  • beta (float) – Long-horizon std decay floor in [0,1]. Effective std(t) = (beta + (1-beta) * exp(-t/tau)) * sigma_u0.

  • tau (float) – Decay time constant (in steps).

  • sigma_u0 (float, default: 0.6) – Initial U-space std per dimension at t=0 (before decay). Values in [0.4, 1.0] are robust.

  • std_min (float, default: 1e-6) – Absolute std floor for numerical stability.

Returns:

U-space Normal with loc/scale shape (H, dA).

Return type:

torch.distributions.Normal

static refit(elites: Tensor, a_mins: Tensor, a_maxs: Tensor, std_min: float = 1e-06) Normal[source]#

Refit the U-space Normal from A-space elites.

Parameters:
  • elites (torch.Tensor) – Elite action sequences in A-space, shape (N_e, H, dA).

  • a_mins (torch.Tensor) – Bounds with shape (dA,1) or (dA,).

  • a_maxs (torch.Tensor) – Bounds with shape (dA,1) or (dA,).

  • std_min (float, default: 1e-6) – Std floor.

Returns:

Updated U-space Normal with loc/scale (H, dA).

Return type:

Normal

static sample(distribution: Normal, shape: Size, a_mins: Tensor, a_maxs: Tensor, smoothing: str = 'ou', rho: float = 0.5, kernel_size: int = 0) Tensor[source]#

Sample actions from a U-space Normal, optionally smooth in U, and squash to A-space.

Parameters:
  • distribution (Normal) – U-space Normal with loc/scale shape (H, dA).

  • shape (torch.Size) – Leading sample shape, e.g., (N,) to get (N,H,dA).

  • a_mins (torch.Tensor) – Bounds, shape (dA,1) or (dA,).

  • a_maxs (torch.Tensor) – Bounds, shape (dA,1) or (dA,).

  • smoothing ({"none","ou","conv"}, default: "none") – Smoothing applied in U-space before squashing.

  • rho (float, default: 0.9) – OU smoothing factor.

  • kernel_size (int, default: 0) – Convolution kernel length (only used if smoothing=”conv”).

Returns:

Actions in A-space with shape (N, H, dA).

Return type:

torch.Tensor

static warm_start(elites: Tensor, a_mins: Tensor, a_maxs: Tensor, widening_factor: float = 1.3, std_min: float = 1e-06) Normal[source]#

Warm-start the U-space Normal from previous elites in A-space.

Steps#

  1. Convert elites A -> U, compute mean/std across elite batch.

  2. Shift μ_u, σ_u forward by one time-step.

  3. Tail: set last μ_u to 0 (center in U), keep last σ_u.

  4. Widen σ_u[0] by widening_factor to retain agility.

  5. (Optional) Anchor μ_u[0] toward the last executed action (converted to U) with convex blend μ_u[0] ← λ * μ_u[0] + (1-λ) * u_exec.

param elites:

Elite A-space action sequences from previous iteration.

type elites:

(N_e, H, dA)

param a_mins:

Bounds.

type a_mins:

(dA,1) or (dA,)

param a_maxs:

Bounds.

type a_maxs:

(dA,1) or (dA,)

param widening_factor:

Multiplier for σ_u at t=0 after shift.

type widening_factor:

float, default: 1.3

param std_min:

Std floor.

type std_min:

float, default: 1e-6

param executed_action:

Last executed action to anchor to (A-space).

type executed_action:

(1, dA) or (dA,), optional

param anchor_lambda:

Blend weight; larger = rely more on shifted mean.

type anchor_lambda:

float, default: 0.8

returns:

Warm-started U-space Normal (H, dA).

rtype:

Normal