Source code for prt_rl.env.wrappers.isaaclab_envs

"""
Wrapper for Isaac Lab environments
"""
import torch
from typing import Optional, Tuple, List, Union, Dict, Any
from prt_rl.env.interface import EnvironmentInterface, EnvParams

import argparse
import sys
import atexit
import gymnasium
from prt_rl.env.wrappers.gymnasium_envs import GymnasiumWrapper

[docs] class IsaaclabWrapper(EnvironmentInterface): """ """ def __init__(self, env_name: str, render_mode: Optional[str] = None, num_envs: int = 1, headless: bool = True ) -> None: super().__init__(render_mode, num_envs=num_envs) # Add arguments to system arguments to create a parser sys.argv.append("--task") sys.argv.append(env_name) sys.argv.append("--num_envs") sys.argv.append(str(num_envs)) if headless: sys.argv.append("--headless") # Create argument parsing object parser = argparse.ArgumentParser("Isaac Lab") parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate") parser.add_argument("--task", type=str, default=None, help="Name of the task") parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") parser.add_argument( "--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations" ) parser.add_argument( "--distributed", action="store_true", default=False, help="Run training with multiple GPUs or nodes" ) # launch the simulation app from isaaclab.app import AppLauncher AppLauncher.add_app_launcher_args(parser) args = parser.parse_args() app_launcher = AppLauncher(args) @atexit.register def close_the_simulator(): app_launcher.app.close() import isaaclab_tasks # type: ignore from isaaclab_tasks.utils import parse_env_cfg # type: ignore cfg = parse_env_cfg(args.task, device=args.device, num_envs=args.num_envs, use_fabric=not args.disable_fabric) if args.distributed: cfg.sim.device = f"cuda:{app_launcher.local_rank}" # load environment self.env = gymnasium.make(env_name, cfg=cfg) # Create environment parameter object self.env_params = self._make_env_params() self.first_reset = True self.state = None self.info = {} def _make_env_params(self) -> EnvParams: """ Creates the environment parameters based on the action and observation space of the environment. Args: vectorized (bool): If True, the environment is vectorized. Returns: EnvParams: The environment parameters object. """ action_space = self.env.unwrapped.single_action_space observation_space = self.env.unwrapped.single_observation_space["policy"] if isinstance(action_space, gymnasium.spaces.Discrete): action_len, act_cont, act_min, act_max = GymnasiumWrapper._get_params_from_discrete(action_space, is_action=True) elif isinstance(action_space, gymnasium.spaces.Box): action_len, act_cont, act_min, act_max = GymnasiumWrapper._get_params_from_box(action_space, is_action=True) else: raise NotImplementedError(f"{action_space} action space is not supported") if isinstance(observation_space, gymnasium.spaces.Discrete): obs_shape, obs_cont, obs_min, obs_max = GymnasiumWrapper._get_params_from_discrete(observation_space) elif isinstance(observation_space, gymnasium.spaces.Box): obs_shape, obs_cont, obs_min, obs_max = GymnasiumWrapper._get_params_from_box(observation_space) else: raise NotImplementedError(f"{observation_space} observation space is not supported") return EnvParams( action_len=action_len, action_continuous=act_cont, action_min=act_min, action_max=act_max, observation_shape=obs_shape, observation_continuous=obs_cont, observation_min=obs_min, observation_max=obs_max, )
[docs] def get_parameters(self) -> EnvParams: """ Returns the EnvParams object which contains information about the sizes of observations and actions needed for setting up RL agents. Returns: EnvParams: environment parameters object """ return self.env_params
[docs] def reset(self, seed: int | None = None) -> Tuple[torch.Tensor, Dict[str, Any]]: """ Resets the environment to the initial state and returns the initial observation. Args: seed (int | None): Sets the random seed. Returns: Tuple: Tuple of tensors containing the initial observation and info dictionary """ if self.first_reset: state, self.info = self.env.reset(seed=seed) self.state = state['policy'] self.first_reset = False # The state is a dictionary and the observation is in the key 'policy'. Sometimes there is also a 'critic' key for separate actor/critic observations. return self.state, self.info
[docs] def reset_index(self, index: int, seed: int | None = None) -> Tuple[torch.Tensor, Dict[str, Any]]: """ Resets only the environments that are done. Args: done (torch.Tensor): Boolean tensor of shape (num_envs, 1) or (num_envs,) Returns: Tuple[torch.Tensor, Dict[str, Any]]: The new observations and info dict """ if self.first_reset: state, self.info = self.env.reset(seed=seed) self.state = state['policy'] self.first_reset = False return self.state[index], self.info
[docs] def step(self, action: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, Dict[str, Any]]: """ Steps the simulation using the action tensor and returns the new trajectory. Args: action (torch.Tensor): Tensor with "action" key that is a tensor with shape (# env, # actions) Returns: Tuple: Tuple of tensors containing the next state, reward, done, and info dictionary """ next_state, reward, terminated, truncated, self.info = self.env.step(action) done = torch.logical_or(terminated, truncated) self.state = next_state['policy'] return next_state['policy'], reward.unsqueeze(-1), done.unsqueeze(-1), self.info
[docs] def close(self) -> None: """ Closes the environment and cleans up any resources. """ self.env.close()
if __name__ == '__main__': env = IsaaclabWrapper(env_name="Isaac-Ant-Direct-v0", num_envs=5) state, info = env.reset() print(state.shape) print(state) print(info) env.close()