from abc import ABC, abstractmethod
import optuna
import numpy as np
from typing import Callable, Dict, Any
from concurrent.futures import ProcessPoolExecutor, as_completed
from sklearn.model_selection import ParameterGrid
import multiprocessing
from tqdm import tqdm
import signal
import psutil
import os
[docs]
class HyperparameterTuner(ABC):
"""
Abstract base class for implementing hyperparameter tuners.
"""
[docs]
@abstractmethod
def tune(self,
objective_fcn: Callable[[Dict], float],
parameters: dict,
) -> Dict[str, Any]:
"""
Tune the hyperparameters of the given objective function.
Args:
objective_fcn (Callable[[Dict], float]): The objective function to be optimized.
parameters (dict): The parameter dictionary that specifies the types and ranges to optimize.
Returns:
Dict[str, Any]: The best hyperparameters found during tuning.
"""
pass
[docs]
class OptunaTuner(HyperparameterTuner):
"""
Hyperparameter tuning using Optuna.
Args:
total_trials (int): The number of trials to run.
maximize (bool): Whether to maximize the objective function. Default is True.
num_jobs (int): The number of parallel jobs to run. Default is -1 (use all available cores).
Example:
.. python::
param_dict = {
'x': {
'type': 'float',
'low': 0.1,
'high': 1.0
},
'y': {
'type': 'int',
'low': 1,
'high': 10,
'step': 1
}
}
def objective(params):
# Define your objective function here
return (params['x'] - 2) ** 2 + (params['y'] - 5) ** 2
tuner = OptunaTuner(total_trials=100)
best_params = tuner.tune(objective, parameters=param_dict)
"""
def __init__(self,
total_trials: int,
maximize: bool = True,
num_jobs: int = -1,
) -> None:
self.total_trials = total_trials
self.maximize = maximize
self.num_jobs = num_jobs
[docs]
def tune(self,
objective_fcn: Callable,
parameters: dict,
) -> Dict[str, Any]:
"""
Tune the hyperparameters of the given model using Optuna.
Args:
objective_fcn (Callable[[Dict], float]): The objective function to be optimized.
parameters (dict): The parameter dictionary that specifies the types and ranges to optimize.
Returns:
Dict[str, Any]: The best hyperparameters found during tuning.
"""
study = optuna.create_study(direction="maximize" if self.maximize else "minimize")
study.optimize(lambda trial: OptunaTuner._objective(objective_fcn, trial, parameters),
catch=AssertionError,
show_progress_bar=True,
n_trials=self.total_trials,
n_jobs=self.num_jobs)
trial = study.best_trial
return trial.params
@staticmethod
def _objective(obj_fcn: Callable, trial: optuna.Trial, param_dict: dict) -> float:
"""
Objective function for Optuna to optimize.
Args:
obj_fcn (callable): The objective function to be optimized.
trial (optuna.Trial): The Optuna trial object.
param_dict (dict): The parameter dictionary.
Returns:
The objective function value.
"""
params = OptunaTuner._configure_params(trial, param_dict)
return obj_fcn(params)
@staticmethod
def _configure_params(trial: optuna.Trial, param_dict: dict) -> dict:
"""
Converts parameter dictionary definition to register values with the trial.
Args:
trial (optuna.Trial): The Optuna trial object.
param_dict (dict): Parameter dictionary that specifies the types and ranges to optimize
Returns:
A dictionary of parameter keys with the value for the current trial
"""
params = {}
for key, value in param_dict.items():
val_type = value['type']
if val_type == 'float':
log_val = value['log'] if 'log' in value else False
step = value['step'] if 'step' in value else None
if log_val and step is not None:
raise ValueError("Log scale and step cannot be used together.")
params[key] = trial.suggest_float(key, low=value['low'], high=value['high'], log=log_val, step=step)
elif val_type == 'categorical':
params[key] = trial.suggest_categorical(key, value['values'])
elif val_type == 'int':
log_val = value['log'] if 'log' in value else False
step = value['step'] if 'step' in value else 1
if log_val and step != 1:
raise ValueError("Log scale and step cannot be used together.")
params[key] = trial.suggest_int(key, low=value['low'], high=value['high'], log=log_val, step=step)
else:
raise ValueError(f"Unsupported parameter type: {val_type}")
return params
[docs]
class GridSearchTuner(HyperparameterTuner):
"""
Hyperparameter tuning using Grid Search.
Args:
total_trials (int): The number of trials to run.
maximize (bool): Whether to maximize the objective function. Default is True.
num_jobs (int): The number of parallel jobs to run. Default is -1 (use all available cores).
"""
def __init__(self,
total_trials: int,
maximize: bool = True,
num_jobs: int = -1,
) -> None:
# Multiprocessing start method must be set to 'spawn' for compatibility with Pytorch's CUDA backend.
if multiprocessing.get_start_method(allow_none=True) != 'spawn':
try:
multiprocessing.set_start_method('spawn', force=True)
except RuntimeError:
raise RuntimeError("Failed to set multiprocessing start method to 'spawn'. "
"Ensure that this is called at the start of your script.")
self.total_trials = total_trials
self.maximize = maximize
self.num_jobs = num_jobs if num_jobs != -1 else multiprocessing.cpu_count()
[docs]
def tune(self,
objective_fcn: Callable,
parameters: dict,
) -> Dict[str, Any]:
"""
Tune the hyperparameters of the given model using Grid Search.
Args:
objective_fcn (Callable[[Dict], float]): The objective function to be optimized.
parameters (dict): The parameter dictionary that specifies the types and ranges to optimize.
Returns:
Dict[str, Any]: The best hyperparameters found during tuning.
"""
grid = list(ParameterGrid(self._convert_parameters(parameters)))
best_score = float('-inf') if self.maximize else float('inf')
best_params = None
params = []
scores = []
# Custom signal handler
interrupted = False
def signal_handler(sig, frame):
nonlocal interrupted
print("\n🔴 Received Ctrl+C. Attempting to shut down gracefully...")
interrupted = True
# Register the signal handler
signal.signal(signal.SIGINT, signal_handler)
# Use process-based parallelism
try:
with ProcessPoolExecutor(max_workers=self.num_jobs) as executor:
futures = {executor.submit(objective_fcn, params): params for params in grid}
with tqdm(total=len(futures), desc="Grid Search", unit="trial", position=0) as pbar:
for future in as_completed(futures):
# Handle Ctrl+C interruption to shutdown the current process and any queued tasks
if interrupted:
executor.shutdown(wait=False, cancel_futures=True)
self._kill_child_processes()
raise KeyboardInterrupt("Grid search interrupted by user.")
param_set = futures[future]
params.append(param_set)
try:
score = future.result()
scores.append(score)
is_better = (
(self.maximize and score > best_score) or
(not self.maximize and score < best_score)
)
if is_better:
best_score = score
best_params = param_set
print(f"🔥 New best trial! Score: {best_score:.4f}, Params: {best_params}")
except Exception as e:
scores.append(None)
print(f"❌ Trial failed for {params}: {e}")
finally:
pbar.update(1)
except KeyboardInterrupt:
raise KeyboardInterrupt("Grid search interrupted by user.")
return {'best_params': best_params, 'best_score': best_score, 'scores': scores, 'params': params}
@staticmethod
def _convert_parameters(parameters: dict) -> Dict[str, Any]:
"""
Convert the parameter dictionary to a format suitable for grid search.
Args:
parameters (dict): The parameter dictionary.
Returns:
Dict[str, Any]: The converted parameter dictionary.
"""
converted = {}
for key, value in parameters.items():
if value['type'] == 'categorical':
converted[key] = value['values']
elif value['type'] == 'float':
if 'step' in value:
step = value['step']
else:
step = (value['high'] - value['low']) / 10
converted[key] = np.arange(value['low'], value['high'], step)
elif value['type'] == 'int':
if 'step' in value:
step = value['step']
else:
step = 1
converted[key] = np.arange(value['low'], value['high'], step)
else:
raise ValueError(f"Unsupported parameter type: {value['type']}")
return converted
@staticmethod
def _kill_child_processes():
parent = psutil.Process(os.getpid())
children = parent.children(recursive=True)
for child in children:
child.terminate()
psutil.wait_procs(children, timeout=5)