cross_entropy#
Functions#
- prt_rl.model_based.planners.cross_entropy.temporal_smooth(x: Tensor, method: str = 'none', rho: float = 0.9, kernel_size: int = 0) Tensor[source]#
Apply simple temporal smoothing along the horizon dimension.
- Parameters:
x – torch.Tensor Tensor of shape (N, H, dA) where: - N: number of sequences/samples - H: planning horizon - dA: action dimension
method –
{“none”, “ou”, “conv”}, default: “none” - “none”: return x unchanged. - “ou”: Exponential moving average (Ornstein–Uhlenbeck-like) smoothing:
out[:, t] = rho * out[:, t-1] + (1 - rho) * x[:, t]
”conv”: 1D convolution with a Gaussian-ish kernel of length kernel_size.
rho – float, default: 0.9 Smoothing factor for “ou”. Higher means smoother (more inertia).
kernel_size – int, default: 0 Kernel length for “conv”. Must be >= 3 to have an effect.
- Returns:
- torch.Tensor
Smoothed tensor with the same shape as x (N, H, dA).
Notes
Smoothing should be applied in U-space for tanh bound mode (preferred),
and in A-space for clip mode. - For “conv”, edges are handled with ‘replicate’ padding.
Classes#
Hard clipping strategy (simple & useful for bang-bang optima).
Cross-Entropy Method (CEM) planner for continuous control with support for tanh-squash (U-space) and clip (A-space) bounding strategies.
Tanh squashing strategy (recommended default).
- class prt_rl.model_based.planners.cross_entropy.ClipBound[source]#
Hard clipping strategy (simple & useful for bang-bang optima).
- The distribution lives and is refit in A-space. Sampling:
A ~ Normal(mu_a, sigma_a) (shape (H, dA)) A := clamp(A, [a_min, a_max])
Shapes#
a_mins, a_maxs : (dA, 1) or (dA,)
distribution.loc/scale : (H, dA)
samples / elites : (N, H, dA)
- static cold_start(H: int, a_mins: Tensor, a_maxs: Tensor, beta: float, tau: float, std_min: float = 1e-06) Normal[source]#
Initialize an A-space Normal with center-of-box mean and decayed half-span std.
sigma_a(t) = (beta + (1 - beta) * exp(-t/tau)) * (a_max - a_min)/2
- Returns:
A-space Normal (H, dA).
- Return type:
Normal
- static sample(distribution: Normal, shape: Size, a_mins: Tensor, a_maxs: Tensor, smoothing: str = 'ou', rho: float = 0.5, kernel_size: int = 0) Tensor[source]#
Sample actions from an A-space Normal, optionally smooth in A, then clamp to bounds.
- Returns:
A-space actions with shape (N, H, dA), in-bounds if clamp=True.
- Return type:
- static warm_start(elites: Tensor, a_mins: Tensor, a_maxs: Tensor, widening_factor=1.5, std_min=1e-06) Normal[source]#
Warm-start the A-space Normal from previous elites.
Steps#
Compute mean/std across elites.
Shift mu_a, sigma_a forward by one time-step.
Tail: - “repeat”: use last mean/std - “center”: set last mean to center of box (keeps last std)
Widen sigma_a[0] by widening_factor.
(Optional) Anchor mu_a[0] toward last executed action with convex blend.
- param elites:
(N_e, H, dA) Elite A-space action sequences from previous iteration.
- param a_mins:
(dA,1) or (dA,) Bounds are not used. These are part of the interface but not required.
- param a_maxs:
(dA,1) or (dA,) Bounds are not used. These are part of the interface but not required.
- param widening_factor:
float, default: 1.5 Multiplier for sigma_a at t=0 after shift.
- param std_min:
float, default: 1e-6 Std floor.
- returns:
- Normal
Warm-started A-space Normal (H, dA).
- class prt_rl.model_based.planners.cross_entropy.CrossEntropyMethodPlanner(action_mins: Tensor, action_maxs: Tensor, num_action_sequences: int = 100, planning_horizon: int = 10, num_elites: int = 10, num_iterations: int = 5, use_smoothing: bool = False, use_clipping: bool = False, tau: float | None = None, beta: float = 0.2, device: str = 'cpu')[source]#
Cross-Entropy Method (CEM) planner for continuous control with support for tanh-squash (U-space) and clip (A-space) bounding strategies.
Workflow per planning call#
Initialize or warm-start the sequence distribution (shape (H, dA)).
Repeat for K iterations: a) Sample N sequences (N,H,dA) using the bound strategy. b) Roll out through the (known or learned) dynamics model. c) Compute reward for each sequence, pick top M (elites). d) Refit the distribution from elites (in the proper space).
Return the first action of the best-scoring elite.
- param action_mins:
Action bounds with shape (dA, 1) (or (dA,)). Broadcasted internally.
- type action_mins:
torch.Tensor
- param action_maxs:
Action bounds with shape (dA, 1) (or (dA,)). Broadcasted internally.
- type action_maxs:
torch.Tensor
- param num_action_sequences:
N, number of sequences sampled per iteration.
- type num_action_sequences:
int, default: 100
- param planning_horizon:
H, number of steps in each sequence.
- type planning_horizon:
int, default: 10
- param num_elites:
M, number of top sequences used for refit.
- type num_elites:
int, default: 10
- param num_iterations:
K, number of CEM refinement iterations per plan call.
- type num_iterations:
int, default: 5
- param use_smoothing:
If True, apply temporal smoothing (OU) inside the bound strategy (U-space for tanh, A-space for clip).
- type use_smoothing:
bool, default: False
- param use_clipping:
If True, use ClipBound; otherwise use TanhSquashBound.
- type use_clipping:
bool, default: False
- param tau:
Time constant for std decay schedule.
- type tau:
float or None, default: H/3
- param beta:
Long-horizon std floor fraction for the decay schedule.
- type beta:
float, default: 0.2
- param device:
Device for internal tensors.
- type device:
{“cpu”,”cuda”,…}, default: “cpu”
Notes
This implementation assumes higher reward is better. If you use costs, either flip the sign or use largest=False in topk.
rollout_action_sequence(model_config, model_fcn, state, actions) must return a dict with ‘state’, ‘action’, and ‘next_state’ batches consistent with shapes (N, H, ·).
- plan(model_fcn: Callable, model_config: Any, reward_fcn: Callable, state: Tensor) Tensor[source]#
Run one CEM planning call and return the first action to execute.
- Parameters:
model_fcn (Callable) – One-step dynamics function (batched) used by the rollout utility.
model_config (Any) – Additional config passed to your rollout helper.
reward_fcn (Callable) – Function computing rewards from rollout dict; returns (N,) reward per sequence.
state (torch.Tensor) – Current state (batching left to caller/rollout helper).
- Returns:
First action of the best elite sequence, shape (1, dA).
- Return type:
Notes
Sampling returns A-space actions in both strategies.
Refit is done in U or A space depending on the bound strategy.
- class prt_rl.model_based.planners.cross_entropy.TanhSquashBound[source]#
Tanh squashing strategy (recommended default).
- The underlying distribution lives in U-space (unbounded). Sampling:
U ~ Normal(mu_u, sigma_u) (shape (H, dA)) A = (tanh(U) + 1)/2 * (a_max - a_min) + a_min (shape (N, H, dA), in bounds)
Refit must be done in U-space. We therefore convert A-space elites back to U-space via an atanh-like transform and compute the new Normal in U.
All methods here are stateless utilities; the planner owns the current distribution.
Shapes#
a_mins, a_maxs : (dA, 1) or (dA,)
distribution.loc/scale : (H, dA)
samples / elites : (N, H, dA)
- static cold_start(H: int, a_mins: Tensor, a_maxs: Tensor, beta: float, tau: float, sigma_u0: float = 0.6, std_min: float = 1e-06) Normal[source]#
Initialize the U-space Normal used for tanh squashing.
- Parameters:
H (int) – Planning horizon.
a_mins (torch.Tensor) – Bounds with shape (dA,1) or (dA,). Used only for dtype/device; U-space init is centered at zero regardless of bounds.
a_maxs (torch.Tensor) – Bounds with shape (dA,1) or (dA,). Used only for dtype/device; U-space init is centered at zero regardless of bounds.
beta (float) – Long-horizon std decay floor in [0,1]. Effective std(t) = (beta + (1-beta) * exp(-t/tau)) * sigma_u0.
tau (float) – Decay time constant (in steps).
sigma_u0 (float, default: 0.6) – Initial U-space std per dimension at t=0 (before decay). Values in [0.4, 1.0] are robust.
std_min (float, default: 1e-6) – Absolute std floor for numerical stability.
- Returns:
U-space Normal with loc/scale shape (H, dA).
- Return type:
torch.distributions.Normal
- static refit(elites: Tensor, a_mins: Tensor, a_maxs: Tensor, std_min: float = 1e-06) Normal[source]#
Refit the U-space Normal from A-space elites.
- Parameters:
elites (torch.Tensor) – Elite action sequences in A-space, shape (N_e, H, dA).
a_mins (torch.Tensor) – Bounds with shape (dA,1) or (dA,).
a_maxs (torch.Tensor) – Bounds with shape (dA,1) or (dA,).
std_min (float, default: 1e-6) – Std floor.
- Returns:
Updated U-space Normal with loc/scale (H, dA).
- Return type:
Normal
- static sample(distribution: Normal, shape: Size, a_mins: Tensor, a_maxs: Tensor, smoothing: str = 'ou', rho: float = 0.5, kernel_size: int = 0) Tensor[source]#
Sample actions from a U-space Normal, optionally smooth in U, and squash to A-space.
- Parameters:
distribution (Normal) – U-space Normal with loc/scale shape (H, dA).
shape (torch.Size) – Leading sample shape, e.g., (N,) to get (N,H,dA).
a_mins (torch.Tensor) – Bounds, shape (dA,1) or (dA,).
a_maxs (torch.Tensor) – Bounds, shape (dA,1) or (dA,).
smoothing ({"none","ou","conv"}, default: "none") – Smoothing applied in U-space before squashing.
rho (float, default: 0.9) – OU smoothing factor.
kernel_size (int, default: 0) – Convolution kernel length (only used if smoothing=”conv”).
- Returns:
Actions in A-space with shape (N, H, dA).
- Return type:
- static warm_start(elites: Tensor, a_mins: Tensor, a_maxs: Tensor, widening_factor: float = 1.3, std_min: float = 1e-06) Normal[source]#
Warm-start the U-space Normal from previous elites in A-space.
Steps#
Convert elites A -> U, compute mean/std across elite batch.
Shift μ_u, σ_u forward by one time-step.
Tail: set last μ_u to 0 (center in U), keep last σ_u.
Widen σ_u[0] by widening_factor to retain agility.
(Optional) Anchor μ_u[0] toward the last executed action (converted to U) with convex blend μ_u[0] ← λ * μ_u[0] + (1-λ) * u_exec.
- param elites:
Elite A-space action sequences from previous iteration.
- type elites:
(N_e, H, dA)
- param a_mins:
Bounds.
- type a_mins:
(dA,1) or (dA,)
- param a_maxs:
Bounds.
- type a_maxs:
(dA,1) or (dA,)
- param widening_factor:
Multiplier for σ_u at t=0 after shift.
- type widening_factor:
float, default: 1.3
- param std_min:
Std floor.
- type std_min:
float, default: 1e-6
- param executed_action:
Last executed action to anchor to (A-space).
- type executed_action:
(1, dA) or (dA,), optional
- param anchor_lambda:
Blend weight; larger = rely more on shifted mean.
- type anchor_lambda:
float, default: 0.8
- returns:
Warm-started U-space Normal (H, dA).
- rtype:
Normal