Source code for prt_rl.common.loggers

"""
Metric and artifact loggers


"""
import json
import matplotlib.pyplot as plt
import mlflow
import numpy as np
import os
import shutil
import torch
from typing import Optional


[docs] class Logger: """ Based class for implementing loggers for RL algorithms. """ def __init__( self, logging_freq: int = 1, ) -> None: self.logging_freq = logging_freq self.last_logging_iteration = 0
[docs] def close(self): """ Performs any necessary logger cleanup. """ pass
[docs] def should_log(self, iteration: int) -> bool: """ Determines whether to log based on the current iteration and logging frequency. Args: iteration (int): Current iteration number. Returns: bool: True if logging should occur, False otherwise. """ iteration += 1 # Adjust for 0-based indexing current_interval = iteration // self.logging_freq last_interval = self.last_logging_iteration // self.logging_freq if current_interval > last_interval: self.last_logging_iteration = iteration return True return False
[docs] def log_parameters( self, params: dict, ) -> None: """ Logs a dictionary of parameters. Parameters are values used to initialize but do not change throughout training. Args: params (dict): Dictionary of parameters. """ pass
[docs] def log_scalar( self, name: str, value: float, iteration: Optional[int] = None, ) -> None: """ Logs a scalar value. Scalar values are any metric or value that changes throughout training. Args: name (str): Name of the scalar value. value (float): Value of the scalar value. iteration (int, optional): Iteration number. """ pass
[docs] def log_metrics(self, metrics: dict[str, float], iteration: int | None = None ) -> None: """ Logs multiple scalar metrics. Args: metrics (dict): Dictionary of metric names and their corresponding values. iteration (int, optional): Iteration number. """ for name, value in metrics.items(): self.log_scalar(name, value, iteration)
[docs] def log_artifact( self, path: str, *, name: str | None = None, type: str | None = None, step: int | None = None, metadata: dict | None = None, aliases: list[str] | None = None, ) -> None: """ Logs an artifact file. Args: path (str): Path to the artifact file. name (str): Name of the artifact. """ pass
[docs] class FileLogger(Logger): def __init__( self, output_dir: str, logging_freq: int = 1, ) -> None: super().__init__(logging_freq=logging_freq) self.output_dir = output_dir os.makedirs( self.output_dir, exist_ok=True ) # Ensure the output directory exists self.parameters = {} self.scalars = {}
[docs] def close(self): """ Writes the saved parameters and scalar metrics to a file. """ def to_serializable(obj): if isinstance(obj, (np.generic, torch.Tensor)): return obj.item() return obj param_file_path = os.path.join(self.output_dir, "parameters.json") with open(param_file_path, "w") as f: json.dump(self.parameters, f, indent=4) scalar_file_path = os.path.join(self.output_dir, "scalars.json") with open(scalar_file_path, "w") as f: serializable_scalars = { k: [(int(step), to_serializable(value)) for step, value in v] for k, v in self.scalars.items() } json.dump(serializable_scalars, f, indent=4)
[docs] def log_parameters( self, params: dict, ) -> None: """ Logs a dictionary of parameters. """ self.parameters.update(params)
[docs] def log_scalar( self, name: str, value: float, iteration: Optional[int] = None, ) -> None: """ Logs scalar values, storing them sequentially or with a provided iteration number. """ if name not in self.scalars: self.scalars[name] = [] if iteration is None: iteration = len(self.scalars[name]) self.scalars[name].append((iteration, value))
[docs] def log_file(self, path: str, name: str, move: bool = False) -> None: """ Saves the given file to the output_dir/name folder. Creates the folder if it does not exist. """ target_dir = os.path.join(self.output_dir, name) os.makedirs(target_dir, exist_ok=True) target_path = os.path.join(target_dir, os.path.basename(path)) if move: shutil.move(path, target_path) else: shutil.copy(path, target_path)
# def save_policy(self, policy, name: str = "policy") -> None: # policy_path = os.path.join(self.output_dir, name) # os.makedirs(policy_path, exist_ok=True) # torch.save(policy, os.path.join(policy_path, "model.pth"))
[docs] class MLFlowLogger(Logger): """ MLFlow Logger Notes: psutil must be installed with pip to log system cpu metrics. pynvml must be installed with pip to log gpu metrics. References: [1] https://mlflow.org/docs/latest/python_api/mlflow.html """ def __init__( self, experiment_name: str, *, tracking_uri: str | None = None, run_name: Optional[str] = None, log_system_metrics: bool = False, logging_freq: int = 1, ) -> None: super().__init__(logging_freq=logging_freq) self.tracking_uri = tracking_uri if tracking_uri is not None else os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000") self.experiment_name = experiment_name self.run_name = run_name self.iteration = 0 mlflow.set_tracking_uri(self.tracking_uri) mlflow.set_registry_uri(self.tracking_uri) mlflow.set_experiment(self.experiment_name) self.run = mlflow.start_run( run_name=self.run_name, log_system_metrics=log_system_metrics, )
[docs] def close(self): """ Closes and cleans up the MLFlow logger. """ mlflow.end_run()
[docs] def log_parameters( self, params: dict, ) -> None: """ Logs a dictionary of parameters. Parameters are values used to initialize but do not change throughout training. Args: params (dict): Dictionary of parameters. """ mlflow.log_params(params)
[docs] def log_scalar( self, name: str, value: float, iteration: Optional[int] = None ) -> None: """ Logs a scalar value to MLFlow. Args: name (str): Name of the scalar value. value (float): Value of the scalar value. iteration (int, optional): Iteration number. """ mlflow.log_metric(name, value, step=iteration) if iteration is None: self.iteration += 1 else: self.iteration = iteration
[docs] def log_figure( self, fig, name: str, iteration: Optional[int] = None, ) -> None: """ Logs a matplotlib figure to MLFlow as an artifact. Args: fig (matplotlib.figure.Figure): The figure to log. name (str): Name of the figure artifact. iteration (int, optional): Iteration number for logging. """ iteration_str = f"{iteration}" if iteration is not None else "final" mlflow.log_figure(fig, f"{name}_{iteration_str}.png") plt.close(fig) # Close the figure to free up memory
[docs] def log_tags(self, tags: dict) -> None: """ Logs a dictionary of tags to MLFlow. Args: tags (dict): Dictionary of tags to log. """ mlflow.set_tags(tags)
[docs] def log_directory(self, dir: str, path: str| None = None) -> None: """ Logs a file as an artifact to MLFlow. Args: path (str): Path to the file to log. name (str): Name of the artifact in MLFlow. """ mlflow.log_artifacts(local_dir=dir, artifact_path=path)
# def save_agent(self, agent: object, agent_name: str = "agent.pt") -> None: # """ # Saves the agent to the MLFlow run. # Args: # agent (object): The agent object to save # """ # with tempfile.TemporaryDirectory() as tmpdir: # save_path = os.path.join(tmpdir, agent_name) # torch.save(agent, save_path) # mlflow.log_artifact(save_path, artifact_path="agent")
[docs] class ClearMLLogger(Logger): """ A lightweight adapter that logs training runs to **ClearML**. This logger mirrors the behavior of an MLflow-style logger while taking advantage of ClearML primitives: - Creates and manages a ClearML **Task** under the specified project. - Logs **parameters** as static configuration (via ClearML configuration / hyperparameters panels). - Logs **scalars** (metrics) with optional `"group/metric"` naming that maps to ClearML's *(title, series)* convention. - Saves an **agent** object as a versioned artifact attached to the Task. - Registers a **policy** as a first-class ClearML **Model** in the Model Registry (including design/metadata and uploaded weights), so it is discoverable and diffable alongside other models. The class expects ClearML to be installed and configured (e.g., via `clearml-init` or environment variables). It does **not** start or manage any background queues; uploads occur synchronously within each call. Attributes ---------- project_name : str Name of the ClearML project under which the Task will be created. task_name : str Name of the ClearML Task (run) created for this logger instance. logging_freq : int Frequency hint inherited from :class:`Logger`. If your training loop calls `log_scalar` every step, you may use this value to conditionally log every *n*-th step (your loop is responsible for honoring it). iteration : int Monotonically increasing counter used as the default `iteration` (step) for scalar logging when none is supplied explicitly. task : clearml.Task The underlying ClearML Task object created during initialization. _logger : clearml.Logger The ClearML experiment logger obtained from `task.get_logger()`. Notes ----- - **Scalar naming:** If you pass `"loss/train"` to `log_scalar`, it will be stored with `title="loss"` and `series="train"`. If no slash is present, the title defaults to `"metrics"` and the entire name becomes the series. - **Parameters:** Parameters are intended to be *static* (do not change over training). They are recorded into ClearML's configuration/hyperparameters view to support comparison and reproducibility. - **Models vs. Artifacts:** `save_policy` registers a ClearML *Model* (appears in the Model Registry) and uploads weights (e.g., a PyTorch `state_dict` or a pickle fallback). `save_agent` uploads a file or object as a Task *artifact* (appears under the run’s Artifacts panel). - **Environment:** Ensure ClearML is configured to point at your server (self-hosted or SaaS) via `clearml-init` or environment variables (`CLEARML_API_HOST`, `CLEARML_API_ACCESS_KEY`, `CLEARML_API_SECRET_KEY`). Examples -------- Basic usage in a training loop: >>> logger = ClearMLLogger(project_name="Demo/PRT", task_name="PPO CartPole") >>> logger.log_parameters({"algo": "PPO", "seed": 42, "lr": 3e-4}) >>> for step in range(1000): ... loss = 1.0 / (step + 1) ... reward = step * 0.1 ... logger.log_scalar("loss/train", loss) # auto step (0,1,2,...) ... logger.log_scalar("reward/mean", reward) >>> # Save objects >>> agent = {"type": "ppo", "notes": "demo"} >>> logger.save_agent(agent) # artifact on the Task >>> policy = my_policy # your BasePolicy impl >>> logger.save_policy(policy) # Model in Registry >>> logger.close() See Also -------- clearml.Task : Underlying experiment/run object. clearml.OutputModel : Used for registering models in the ClearML registry. Raises ------ ImportError If the `clearml` package is not installed or cannot be imported. """ def __init__( self, project_name: str, task_name: str, logging_freq: int = 1, ) -> None: try: import clearml self.clearml = clearml except ImportError as e: raise ImportError("You need to install clearml to use this logger.") from e super().__init__(logging_freq=logging_freq) self.project_name = project_name self.task_name = task_name self.task = self.clearml.Task.init( project_name=self.project_name, task_name=self.task_name ) self._logger = self.task.get_logger()
[docs] def close(self): """ Closes and cleans up the ClearML logger. """ self.task.close()
[docs] def log_parameters( self, params: dict, ) -> None: """ Logs a dictionary of parameters. Parameters are values used to initialize but do not change throughout training. Args: params (dict): Dictionary of parameters. """ self.task.set_parameters_as_dict(params)
[docs] def log_scalar( self, name: str, value: float, iteration: Optional[int] = None, ) -> None: """ Logs a scalar value. Scalar values are any metric or value that changes throughout training. Args: name (str): Name of the scalar value. value (float): Value of the scalar value. iteration (int, optional): Iteration number. """ # ClearML expects a (title, series) breakdown. Allow "group/metric" syntax; otherwise use a default group. if "/" in name: title, series = name.split("/", 1) else: title, series = "metrics", name step = self.iteration if iteration is None else iteration self._logger.report_scalar( title=title, series=series, value=value, iteration=step ) # Keep our local iteration in sync like your MLFlowLogger if iteration is None: self.iteration += 1 else: self.iteration = iteration
# def save_agent( # self, # agent: object, # ) -> None: # """ # Saves the agent to the logger. # Args: # agent (object): Agent to save. # """ # with tempfile.TemporaryDirectory() as tmpdir: # filename = "agent.pt" # save_path = os.path.join(tmpdir, filename) # torch.save(agent, save_path) # # Upload file as an artifact (keeps original filename) # self.task.upload_artifact( # name="agent", artifact_object=save_path, metadata={"filename": filename} # )