Skip to content

Probabilistic Calibration API

Unified API

Probabilistic calibration is accessed through the Calibrator class using the run_probabilistic() method. See the Calibrator API for the complete API reference.

Quick Reference

from commol import Calibrator, CalibrationProblem, ProbabilisticCalibrationConfig

# Configure probabilistic calibration in the problem
problem = CalibrationProblem(
    observed_data=observed_data,
    parameters=parameters,
    loss_function="sse",
    optimization_config=pso_config,
    probabilistic_config=ProbabilisticCalibrationConfig(n_runs=20),
)

# Use the unified Calibrator with run_probabilistic()
calibrator = Calibrator(simulation, problem)
result = calibrator.run_probabilistic()

ProbabilisticCalibrationConfig

ProbabilisticCalibrationConfig

Bases: BaseModel

Unified configuration for probabilistic calibration.

This configuration groups all settings for the probabilistic calibration workflow, which finds an ensemble of parameter sets with uncertainty quantification instead of a single optimal solution.

The workflow consists of: 1. Run: Multiple independent calibration runs 2. Evaluation Processing: Deduplication and filtering 3. Clustering: Group similar solutions 4. Representative Selection: Pick diverse solutions from clusters 5. Ensemble Selection: NSGA-II multi-objective optimization 6. Statistics: Calculate confidence intervals and coverage

Attributes:

Name Type Description
n_runs int

Number of independent calibration runs to perform (default: 50). More runs provide better parameter space exploration but take longer.

evaluation_processing ProbEvaluationFilterConfig

Configuration for evaluation deduplication and filtering

clustering ProbClusteringConfig

Configuration for clustering parameter space

representative_selection ProbRepresentativeConfig

Configuration for selecting representatives from clusters

ensemble_selection ProbEnsembleConfig

Configuration for NSGA-II ensemble selection

confidence_level float

Confidence interval level (default: 0.95 for 95% CI). Must be in range (0.0, 1.0).

Source code in commol/context/probabilistic_calibration.py
class ProbabilisticCalibrationConfig(BaseModel):
    """
    Unified configuration for probabilistic calibration.

    This configuration groups all settings for the probabilistic calibration
    workflow, which finds an ensemble of parameter sets with uncertainty
    quantification instead of a single optimal solution.

    The workflow consists of:
    1. Run: Multiple independent calibration runs
    2. Evaluation Processing: Deduplication and filtering
    3. Clustering: Group similar solutions
    4. Representative Selection: Pick diverse solutions from clusters
    5. Ensemble Selection: NSGA-II multi-objective optimization
    6. Statistics: Calculate confidence intervals and coverage

    Attributes
    ----------
    n_runs : int
        Number of independent calibration runs to perform (default: 50).
        More runs provide better parameter space exploration but take longer.
    evaluation_processing : ProbEvaluationFilterConfig
        Configuration for evaluation deduplication and filtering
    clustering : ProbClusteringConfig
        Configuration for clustering parameter space
    representative_selection : ProbRepresentativeConfig
        Configuration for selecting representatives from clusters
    ensemble_selection : ProbEnsembleConfig
        Configuration for NSGA-II ensemble selection
    confidence_level : float
        Confidence interval level (default: 0.95 for 95% CI).
        Must be in range (0.0, 1.0).
    """

    n_runs: int = Field(
        default=10, gt=0, description="Number of calibration runs to perform"
    )
    evaluation_processing: ProbEvaluationFilterConfig = Field(
        default_factory=ProbEvaluationFilterConfig,
        description="Configuration for evaluation processing",
    )
    clustering: ProbClusteringConfig = Field(
        default_factory=ProbClusteringConfig,
        description="Configuration for clustering parameter space",
    )
    representative_selection: ProbRepresentativeConfig = Field(
        default_factory=ProbRepresentativeConfig,
        description="Configuration for representative selection",
    )
    ensemble_selection: ProbEnsembleConfig = Field(
        default_factory=ProbEnsembleConfig,
        description="Configuration for NSGA-II ensemble selection",
    )
    confidence_level: float = Field(
        default=0.95,
        gt=0.0,
        lt=1.0,
        description="Confidence interval level (e.g., 0.95 for 95% CI)",
    )

options: show_root_heading: true show_source: false heading_level: 3 show_docstring_attributes: true

ProbabilisticCalibrationResult

ProbabilisticCalibrationResult

Bases: BaseModel

Complete result from probabilistic calibration with ensemble analysis.

Contains the selected optimal ensemble solution, all Pareto-optimal solutions from multi-objective optimization, and metadata about the calibration process.

Attributes:

Name Type Description
selected_ensemble ParetoSolution

The optimal ensemble solution selected based on pareto_preference

pareto_front list[ParetoSolution]

All Pareto-optimal ensemble solutions from NSGA-II optimization These represent different trade-offs between CI width and coverage

selected_pareto_index int

Index in pareto_front of the selected solution

n_runs_performed int

Number of calibration runs performed

n_unique_evaluations int

Number of unique parameter evaluations after deduplication

n_clusters_used int

Number of clusters identified in parameter space

confidence_level float

Confidence level used for interval calculation (e.g., 0.95 for 95% CI)

Source code in commol/context/probabilistic_calibration.py
class ProbabilisticCalibrationResult(BaseModel):
    """
    Complete result from probabilistic calibration with ensemble analysis.

    Contains the selected optimal ensemble solution, all Pareto-optimal
    solutions from multi-objective optimization, and metadata about the
    calibration process.

    Attributes
    ----------
    selected_ensemble : ParetoSolution
        The optimal ensemble solution selected based on pareto_preference
    pareto_front : list[ParetoSolution]
        All Pareto-optimal ensemble solutions from NSGA-II optimization
        These represent different trade-offs between CI width and coverage
    selected_pareto_index : int
        Index in pareto_front of the selected solution
    n_runs_performed : int
        Number of calibration runs performed
    n_unique_evaluations : int
        Number of unique parameter evaluations after deduplication
    n_clusters_used : int
        Number of clusters identified in parameter space
    confidence_level : float
        Confidence level used for interval calculation (e.g., 0.95 for 95% CI)
    """

    selected_ensemble: ParetoSolution = Field(
        description="The optimal ensemble solution selected based on preference"
    )
    pareto_front: list[ParetoSolution] = Field(
        description="All Pareto-optimal ensemble solutions from NSGA-II"
    )
    selected_pareto_index: int = Field(
        description="Index in pareto_front of the selected solution"
    )
    n_runs_performed: int = Field(description="Number of calibration runs performed")
    n_unique_evaluations: int = Field(
        description="Number of unique evaluations after deduplication"
    )
    n_clusters_used: int = Field(description="Number of clusters identified")
    confidence_level: float = Field(
        description="Confidence level used (e.g., 0.95 for 95% CI)"
    )

options: show_root_heading: true show_source: false heading_level: 3 show_docstring_attributes: true

ParetoSolution

ParetoSolution

Bases: BaseModel

A complete ensemble solution with statistics and predictions.

Represents a single ensemble of parameter sets, containing the ensemble composition, parameter statistics, model predictions with confidence intervals, and performance metrics.

Attributes:

Name Type Description
ensemble_size int

Number of parameter sets in this ensemble

selected_indices list[int]

Indices of parameter sets selected for this ensemble

ensemble_parameters list[dict[str, float]]

List of parameter dictionaries in this ensemble

parameter_statistics dict[str, ParameterSetStatistics]

Statistics for each parameter across the ensemble

prediction_median dict[str, list[float]]

Median predictions for each compartment over time

prediction_ci_lower dict[str, list[float]]

Lower bound of confidence interval for each compartment over time

prediction_ci_upper dict[str, list[float]]

Upper bound of confidence interval for each compartment over time

coverage_percentage float

Percentage of observed data points within the confidence intervals

average_ci_width float

Average width of confidence intervals across time and compartments

ci_width float

Normalized confidence interval width objective [0, 1] used in optimization

coverage float

Normalized coverage objective [0, 1] used in optimization

size_penalty float

Size constraint penalty applied during optimization [0, infinity]

Source code in commol/context/probabilistic_calibration.py
class ParetoSolution(BaseModel):
    """
    A complete ensemble solution with statistics and predictions.

    Represents a single ensemble of parameter sets, containing the ensemble
    composition, parameter statistics, model predictions with confidence
    intervals, and performance metrics.

    Attributes
    ----------
    ensemble_size : int
        Number of parameter sets in this ensemble
    selected_indices : list[int]
        Indices of parameter sets selected for this ensemble
    ensemble_parameters : list[dict[str, float]]
        List of parameter dictionaries in this ensemble
    parameter_statistics : dict[str, ParameterSetStatistics]
        Statistics for each parameter across the ensemble
    prediction_median : dict[str, list[float]]
        Median predictions for each compartment over time
    prediction_ci_lower : dict[str, list[float]]
        Lower bound of confidence interval for each compartment over time
    prediction_ci_upper : dict[str, list[float]]
        Upper bound of confidence interval for each compartment over time
    coverage_percentage : float
        Percentage of observed data points within the confidence intervals
    average_ci_width : float
        Average width of confidence intervals across time and compartments
    ci_width : float
        Normalized confidence interval width objective [0, 1] used in optimization
    coverage : float
        Normalized coverage objective [0, 1] used in optimization
    size_penalty : float
        Size constraint penalty applied during optimization [0, infinity]
    """

    ensemble_size: int = Field(description="Number of parameter sets in this ensemble")
    selected_indices: list[int] = Field(
        description="Indices of selected parameter sets"
    )
    ensemble_parameters: list[dict[str, float]] = Field(
        description="List of parameter dictionaries in this ensemble"
    )
    parameter_statistics: dict[str, ParameterSetStatistics] = Field(
        description="Statistics for each parameter across the ensemble"
    )
    prediction_median: dict[str, list[float]] = Field(
        description="Median predictions for each compartment over time"
    )
    prediction_ci_lower: dict[str, list[float]] = Field(
        description="Lower bound of confidence interval for each compartment over time"
    )
    prediction_ci_upper: dict[str, list[float]] = Field(
        description="Upper bound of confidence interval for each compartment over time"
    )
    coverage_percentage: float = Field(
        description="Percentage of observed data points within confidence intervals"
    )
    average_ci_width: float = Field(description="Average width of confidence intervals")
    ci_width: float = Field(description="Normalized CI width objective [0, 1]")
    coverage: float = Field(description="Normalized coverage objective [0, 1]")
    size_penalty: float = Field(description="Size constraint penalty [0, infinity]")

options: show_root_heading: true show_source: false heading_level: 3 show_docstring_attributes: true

ParameterSetStatistics

ParameterSetStatistics

Bases: BaseModel

Statistics for a parameter across the ensemble.

Attributes:

Name Type Description
mean float

Mean value across ensemble

median float

Median value across ensemble

std float

Standard deviation across ensemble

percentile_lower float

Lower percentile bound (e.g., 2.5th for 95% CI)

percentile_upper float

Upper percentile bound (e.g., 97.5th for 95% CI)

min float

Minimum value in ensemble

max float

Maximum value in ensemble

Source code in commol/context/probabilistic_calibration.py
class ParameterSetStatistics(BaseModel):
    """
    Statistics for a parameter across the ensemble.

    Attributes
    ----------
    mean : float
        Mean value across ensemble
    median : float
        Median value across ensemble
    std : float
        Standard deviation across ensemble
    percentile_lower : float
        Lower percentile bound (e.g., 2.5th for 95% CI)
    percentile_upper : float
        Upper percentile bound (e.g., 97.5th for 95% CI)
    min : float
        Minimum value in ensemble
    max : float
        Maximum value in ensemble
    """

    mean: float = Field(description="Mean value across ensemble")
    median: float = Field(description="Median value across ensemble")
    std: float = Field(description="Standard deviation across ensemble")
    percentile_lower: float = Field(
        description="Lower percentile bound of confidence interval"
    )
    percentile_upper: float = Field(
        description="Upper percentile bound of confidence interval"
    )
    min: float = Field(description="Minimum value in ensemble")
    max: float = Field(description="Maximum value in ensemble")

options: show_root_heading: true show_source: false heading_level: 3 show_docstring_attributes: true

CalibrationEvaluation

CalibrationEvaluation dataclass

A single calibration evaluation result.

This dataclass represents a parameter set evaluation from calibration, including the parameters, loss value, and optionally predictions.

Attributes:

Name Type Description
parameters list[float]

Parameter values for this evaluation

loss float

Loss/objective function value

parameter_names list[str]

Names of the parameters (in same order as parameters list)

predictions list[list[float]] | None

Optional predictions matrix with shape (time_steps, compartments)

Source code in commol/context/probabilistic_calibration.py
@dataclass
class CalibrationEvaluation:
    """A single calibration evaluation result.

    This dataclass represents a parameter set evaluation from calibration,
    including the parameters, loss value, and optionally predictions.

    Attributes
    ----------
    parameters : list[float]
        Parameter values for this evaluation
    loss : float
        Loss/objective function value
    parameter_names : list[str]
        Names of the parameters (in same order as parameters list)
    predictions : list[list[float]] | None
        Optional predictions matrix with shape (time_steps, compartments)
    """

    parameters: list[float]
    loss: float
    parameter_names: list[str]
    predictions: list[list[float]] | None = None

    def to_dict(self) -> dict[str, float]:
        """Convert to a dictionary mapping parameter names to values.

        Returns
        -------
        dict[str, float]
            Dictionary with parameter names as keys and values as values
        """
        return {name: self.parameters[i] for i, name in enumerate(self.parameter_names)}

Functions

to_dict

to_dict() -> dict[str, float]

Convert to a dictionary mapping parameter names to values.

Returns:

Type Description
dict[str, float]

Dictionary with parameter names as keys and values as values

Source code in commol/context/probabilistic_calibration.py
def to_dict(self) -> dict[str, float]:
    """Convert to a dictionary mapping parameter names to values.

    Returns
    -------
    dict[str, float]
        Dictionary with parameter names as keys and values as values
    """
    return {name: self.parameters[i] for i, name in enumerate(self.parameter_names)}

options: show_root_heading: true show_source: false heading_level: 3 show_docstring_attributes: true

Visualization

The SimulationPlotter class supports visualization of probabilistic calibration results with confidence intervals.

Plotting with ProbabilisticCalibrationResult

When a ProbabilisticCalibrationResult is passed to plot_series or plot_cumulative, the plotter automatically displays:

  • Median predictions as the main line
  • Confidence interval bands as shaded regions
  • Observed data as scatter points
from commol import SimulationPlotter

# Create plotter with median predictions from selected ensemble
plotter = SimulationPlotter(simulation, result.selected_ensemble.prediction_median)

# Plot with confidence intervals
fig = plotter.plot_series(
    observed_data=observed_data,
    calibration_result=result,
    output_file="probabilistic_fit.png",
)

# Access the selected ensemble solution
selected = result.selected_ensemble
print(f"Ensemble size: {selected.ensemble_size}")
print(f"Coverage: {selected.coverage_percentage:.2f}%")
print(f"Parameter estimates: {selected.parameter_statistics}")

# Explore alternative solutions on the Pareto front
for i, solution in enumerate(result.pareto_front):
    print(f"Solution {i}: size={solution.ensemble_size}, "
          f"coverage={solution.coverage_percentage:.2f}%, "
          f"CI width={solution.average_ci_width:.4f}")

SimulationPlotter.plot_series

plot_series

plot_series(output_file: str | None = None, observed_data: list[ObservedDataPoint] | None = None, calibration_result: CalibrationResult | ProbabilisticCalibrationResult | None = None, config: PlotConfig | None = None, bins: list[str] | None = None, **kwargs: str | int | float | bool | None) -> Figure

Plot simulation results as time series with one subplot per bin.

Creates a figure with subplots arranged in a grid, where each subplot shows the evolution of one bin over time. Optionally overlays observed data points. If a ProbabilisticCalibrationResult is provided, plots confidence intervals.

Parameters:

Name Type Description Default
output_file str | None

Path to save the figure. If None, figure is not saved (only returned).

None
observed_data list[ObservedDataPoint] | None

Optional observed data points to overlay on corresponding bin subplots. Observed data points with a scale_id will be unscaled for plotting using scale values from the calibration_result.

None
calibration_result CalibrationResult | ProbabilisticCalibrationResult | None

Optional calibration result. If ProbabilisticCalibrationResult is provided, plots the median prediction with confidence interval bands. Scale values are extracted from best_parameters (CalibrationResult) or parameter_statistics (ProbabilisticCalibrationResult) to unscale observed data for comparison with model predictions.

None
config PlotConfig | None

Configuration for plot layout and styling (figsize, dpi, layout, style, palette, context). If None, uses defaults.

None
bins list[str] | None

List of bin IDs to plot. If None, plots all bins.

None
**kwargs str | int | float | bool | None

Additional keyword arguments passed to seaborn.lineplot(). Common parameters: linewidth, alpha, linestyle, marker, etc.

{}

Returns:

Type Description
Figure

The matplotlib Figure object.

Source code in commol/api/plotter.py
def plot_series(
    self,
    output_file: str | None = None,
    observed_data: list[ObservedDataPoint] | None = None,
    calibration_result: CalibrationResult
    | ProbabilisticCalibrationResult
    | None = None,
    config: PlotConfig | None = None,
    bins: list[str] | None = None,
    **kwargs: str | int | float | bool | None,
) -> "Figure":
    """
    Plot simulation results as time series with one subplot per bin.

    Creates a figure with subplots arranged in a grid, where each subplot shows
    the evolution of one bin over time. Optionally overlays observed data points.
    If a ProbabilisticCalibrationResult is provided, plots confidence intervals.

    Parameters
    ----------
    output_file : str | None
        Path to save the figure. If None, figure is not saved (only returned).
    observed_data : list[ObservedDataPoint] | None
        Optional observed data points to overlay on corresponding bin subplots.
        Observed data points with a scale_id will be unscaled for plotting using
        scale values from the calibration_result.
    calibration_result : CalibrationResult | ProbabilisticCalibrationResult | None
        Optional calibration result. If ProbabilisticCalibrationResult is provided,
        plots the median prediction with confidence interval bands.
        Scale values are extracted from best_parameters (CalibrationResult) or
        parameter_statistics (ProbabilisticCalibrationResult) to unscale observed
        data for comparison with model predictions.
    config : PlotConfig | None
        Configuration for plot layout and styling (figsize, dpi, layout,
        style, palette, context). If None, uses defaults.
    bins : list[str] | None
        List of bin IDs to plot. If None, plots all bins.
    **kwargs : str | int | float | bool | None
        Additional keyword arguments passed to seaborn.lineplot().
        Common parameters: linewidth, alpha, linestyle, marker, etc.

    Returns
    -------
    Figure
        The matplotlib Figure object.
    """
    logger.info("Starting plot_series")

    config = config or PlotConfig()
    bins_to_plot = bins if bins is not None else self.bins

    self._apply_seaborn_style(config)
    scale_values = self._extract_scale_values(calibration_result)

    observed_by_bin = self._group_observed_data(observed_data)

    fig, axes = self._create_series_figure(config, bins_to_plot)
    self._plot_all_series_bins(
        axes,
        bins_to_plot,
        observed_by_bin,
        scale_values,
        calibration_result,
        kwargs,
    )
    self._finalize_series_plot(axes, bins_to_plot, output_file, config)

    return fig

options: show_root_heading: false show_source: false heading_level: 4

SimulationPlotter.plot_cumulative

plot_cumulative

plot_cumulative(output_file: str | None = None, observed_data: list[ObservedDataPoint] | None = None, calibration_result: CalibrationResult | ProbabilisticCalibrationResult | None = None, config: PlotConfig | None = None, bins: list[str] | None = None, **kwargs: str | int | float | bool | None) -> Figure

Plot cumulative (accumulated) simulation results with one subplot per bin.

Creates a figure showing the running sum of each bin's values over time. Useful for tracking total infections, deaths, or other accumulated quantities. If a ProbabilisticCalibrationResult is provided, plots confidence intervals.

Parameters:

Name Type Description Default
output_file str | None

Path to save the figure. If None, figure is not saved (only returned).

None
observed_data list[ObservedDataPoint] | None

Optional observed data points to overlay (also shown as cumulative). Observed data points with a scale_id will be unscaled for plotting using scale values from the calibration_result.

None
calibration_result CalibrationResult | ProbabilisticCalibrationResult | None

Optional calibration result. If ProbabilisticCalibrationResult is provided, plots the median prediction with confidence interval bands. Scale values are extracted from best_parameters (CalibrationResult) or parameter_statistics (ProbabilisticCalibrationResult) to unscale observed data for comparison with model predictions.

None
config PlotConfig | None

Configuration for plot layout and styling (figsize, dpi, layout, style, palette, context). If None, uses defaults.

None
bins list[str] | None

List of bin IDs to plot. If None, plots all bins.

None
**kwargs str | int | float | bool | None

Additional keyword arguments passed to seaborn.lineplot(). Common parameters: linewidth, alpha, linestyle, marker, etc.

{}

Returns:

Type Description
Figure

The matplotlib Figure object.

Source code in commol/api/plotter.py
def plot_cumulative(
    self,
    output_file: str | None = None,
    observed_data: list[ObservedDataPoint] | None = None,
    calibration_result: CalibrationResult
    | ProbabilisticCalibrationResult
    | None = None,
    config: PlotConfig | None = None,
    bins: list[str] | None = None,
    **kwargs: str | int | float | bool | None,
) -> "Figure":
    """
    Plot cumulative (accumulated) simulation results with one subplot per bin.

    Creates a figure showing the running sum of each bin's values over time.
    Useful for tracking total infections, deaths, or other accumulated quantities.
    If a ProbabilisticCalibrationResult is provided, plots confidence intervals.

    Parameters
    ----------
    output_file : str | None
        Path to save the figure. If None, figure is not saved (only returned).
    observed_data : list[ObservedDataPoint] | None
        Optional observed data points to overlay (also shown as cumulative).
        Observed data points with a scale_id will be unscaled for plotting using
        scale values from the calibration_result.
    calibration_result : CalibrationResult | ProbabilisticCalibrationResult | None
        Optional calibration result. If ProbabilisticCalibrationResult is provided,
        plots the median prediction with confidence interval bands.
        Scale values are extracted from best_parameters (CalibrationResult) or
        parameter_statistics (ProbabilisticCalibrationResult) to unscale observed
        data for comparison with model predictions.
    config : PlotConfig | None
        Configuration for plot layout and styling (figsize, dpi, layout,
        style, palette, context). If None, uses defaults.
    bins : list[str] | None
        List of bin IDs to plot. If None, plots all bins.
    **kwargs : str | int | float | bool | None
        Additional keyword arguments passed to seaborn.lineplot().
        Common parameters: linewidth, alpha, linestyle, marker, etc.

    Returns
    -------
    Figure
        The matplotlib Figure object.
    """
    logger.info("Starting plot_cumulative")

    config = config or PlotConfig()
    bins_to_plot = bins if bins is not None else self.bins

    self._apply_seaborn_style(config)
    scale_values = self._extract_scale_values(calibration_result)

    observed_by_bin = self._group_observed_data(observed_data)
    cumulative_observed = self._calculate_cumulative_observed(
        observed_by_bin, scale_values or {}
    )

    fig, axes = self._create_cumulative_figure(config, bins_to_plot)
    self._plot_all_cumulative_bins(
        axes, bins_to_plot, cumulative_observed, calibration_result, kwargs
    )
    self._finalize_cumulative_plot(axes, bins_to_plot, output_file, config)

    return fig

options: show_root_heading: false show_source: false heading_level: 4

Helper Classes

The probabilistic calibration workflow is orchestrated using focused helper classes:

CalibrationRunner

Runs multiple calibrations in parallel with different random seeds.

CalibrationRunner

Handles running multiple calibrations in parallel.

This class is responsible for: - Converting Python types to Rust types for the calibration problem - Executing parallel calibration runs via Rust/Rayon - Returning calibration results with evaluation history

Parameters:

Name Type Description Default
simulation Simulation

A fully initialized Simulation object with the model to calibrate.

required
problem CalibrationProblem

A fully constructed and validated calibration problem definition.

required
seed int

Random seed for reproducibility. Each calibration run gets a derived seed (seed + run_index).

required
Source code in commol/api/probabilistic/calibration_runner.py
class CalibrationRunner:
    """Handles running multiple calibrations in parallel.

    This class is responsible for:
    - Converting Python types to Rust types for the calibration problem
    - Executing parallel calibration runs via Rust/Rayon
    - Returning calibration results with evaluation history

    Parameters
    ----------
    simulation : Simulation
        A fully initialized Simulation object with the model to calibrate.
    problem : CalibrationProblem
        A fully constructed and validated calibration problem definition.
    seed : int
        Random seed for reproducibility. Each calibration run gets a derived seed
        (seed + run_index).
    """

    def __init__(
        self,
        simulation: "Simulation",
        problem: "CalibrationProblem",
        seed: int,
    ):
        self.simulation = simulation
        self.problem = problem
        self.seed = seed

    def run_multiple(
        self,
        n_runs: int,
    ) -> list["CalibrationResultWithHistoryProtocol"]:
        """Run multiple calibration attempts in parallel using Rust.

        Parameters
        ----------
        n_runs : int
            Number of calibration runs to perform.

        Returns
        -------
        list[CalibrationResultWithHistoryProtocol]
            List of calibration results with evaluation history.

        Raises
        ------
        RuntimeError
            If all calibration runs fail.
        """
        logger.info(f"Running {n_runs} calibrations in parallel")

        # Convert observed data to Rust types
        rust_observed_data = [
            commol_rs.calibration.ObservedDataPoint(
                step=point.step,
                compartment=point.compartment,
                value=point.value,
                weight=point.weight,
                scale_id=point.scale_id,
            )
            for point in self.problem.observed_data
        ]

        # Convert parameters to Rust types
        rust_parameters = [
            commol_rs.calibration.CalibrationParameter(
                id=param.id,
                parameter_type=self._to_rust_parameter_type(param.parameter_type),
                min_bound=param.min_bound,
                max_bound=param.max_bound,
                initial_guess=param.initial_guess,
            )
            for param in self.problem.parameters
        ]

        # Convert constraints to Rust types
        rust_constraints = [
            commol_rs.calibration.CalibrationConstraint(
                id=constraint.id,
                expression=constraint.expression,
                description=constraint.description,
                weight=constraint.weight,
                time_steps=constraint.time_steps,
            )
            for constraint in self.problem.constraints
        ]

        # Convert loss and optimization configs
        rust_loss_config = self._build_loss_config()
        rust_optimization_config = self._build_optimization_config()

        # Get initial population size
        initial_population_size = self._get_initial_population_size()

        # Call Rust function for parallel execution
        try:
            results = commol_rs.calibration.run_multiple_calibrations(
                self.simulation.engine,
                rust_observed_data,
                rust_parameters,
                rust_constraints,
                rust_loss_config,
                rust_optimization_config,
                initial_population_size,
                n_runs,
                self.seed,
            )
            logger.info(f"Completed {len(results)}/{n_runs} calibrations successfully")
            return results
        except Exception as e:
            raise RuntimeError(f"Parallel calibrations failed: {e}") from e

    def _to_rust_parameter_type(
        self, param_type: str
    ) -> "CalibrationParameterTypeProtocol":
        """Convert Python CalibrationParameterType to Rust type."""
        if param_type == CalibrationParameterType.PARAMETER:
            return commol_rs.calibration.CalibrationParameterType.Parameter
        elif param_type == CalibrationParameterType.INITIAL_CONDITION:
            return commol_rs.calibration.CalibrationParameterType.InitialCondition
        elif param_type == CalibrationParameterType.SCALE:
            return commol_rs.calibration.CalibrationParameterType.Scale
        else:
            raise ValueError(f"Unknown parameter type: {param_type}")

    def _build_loss_config(self) -> "LossConfigProtocol":
        """Convert Python loss function to Rust LossConfig."""
        loss_func = self.problem.loss_function

        if loss_func == LossFunction.SSE:
            return commol_rs.calibration.LossConfig.sse()
        elif loss_func == LossFunction.RMSE:
            return commol_rs.calibration.LossConfig.rmse()
        elif loss_func == LossFunction.MAE:
            return commol_rs.calibration.LossConfig.mae()
        elif loss_func == LossFunction.WEIGHTED_SSE:
            return commol_rs.calibration.LossConfig.weighted_sse()
        else:
            raise ValueError(f"Unsupported loss function: {loss_func}")

    def _build_optimization_config(self) -> "OptimizationConfigProtocol":
        """Convert Python OptimizationConfig to Rust OptimizationConfig."""
        from commol.context.calibration import (
            NelderMeadConfig,
            ParticleSwarmConfig,
        )

        opt_config = self.problem.optimization_config

        if isinstance(opt_config, NelderMeadConfig):
            nm_config = commol_rs.calibration.NelderMeadConfig(
                max_iterations=opt_config.max_iterations,
                sd_tolerance=opt_config.sd_tolerance,
                alpha=opt_config.alpha,
                gamma=opt_config.gamma,
                rho=opt_config.rho,
                sigma=opt_config.sigma,
                verbose=opt_config.verbose,
                header_interval=opt_config.header_interval,
            )
            return commol_rs.calibration.OptimizationConfig.nelder_mead(nm_config)

        elif isinstance(opt_config, ParticleSwarmConfig):
            return self._build_pso_config(opt_config)

        else:
            raise ValueError(
                f"Unsupported optimization config type: {type(opt_config).__name__}"
            )

    def _build_pso_config(self, opt_config) -> "OptimizationConfigProtocol":
        """Convert ParticleSwarmConfig to Rust OptimizationConfig."""
        rust_inertia = self._build_rust_inertia(opt_config.inertia_config)
        rust_acceleration = self._build_rust_acceleration(
            opt_config.acceleration_config
        )

        mutation = opt_config.mutation_config
        rust_mutation = None
        if mutation is not None:
            rust_mutation = commol_rs.calibration.PSOMutation(
                strategy=mutation.strategy,
                scale=mutation.scale,
                probability=mutation.probability,
                application=mutation.application,
            )

        velocity = opt_config.velocity_config
        rust_velocity = None
        if velocity is not None:
            rust_velocity = commol_rs.calibration.PSOVelocity(
                clamp_factor=velocity.clamp_factor,
                mutation_threshold=velocity.mutation_threshold,
            )

        ps_config = commol_rs.calibration.ParticleSwarmConfig(
            num_particles=opt_config.num_particles,
            max_iterations=opt_config.max_iterations,
            verbose=opt_config.verbose,
            inertia=rust_inertia,
            acceleration=rust_acceleration,
            mutation=rust_mutation,
            velocity=rust_velocity,
            initialization=opt_config.initialization,
            seed=self.problem.seed,
        )
        return commol_rs.calibration.OptimizationConfig.particle_swarm(ps_config)

    def _build_rust_inertia(
        self, inertia: PSOConstantInertia | PSOChaoticInertia | None
    ):
        """Convert Python inertia config to Rust type."""
        if inertia is None:
            return None
        if isinstance(inertia, PSOConstantInertia):
            return commol_rs.calibration.PSOInertiaConstant(factor=inertia.factor)
        if isinstance(inertia, PSOChaoticInertia):
            return commol_rs.calibration.PSOInertiaChaotic(
                w_min=inertia.w_min, w_max=inertia.w_max
            )
        return None

    def _build_rust_acceleration(
        self, acceleration: PSOConstantAcceleration | PSOTimeVaryingAcceleration | None
    ):
        """Convert Python acceleration config to Rust type."""
        if acceleration is None:
            return None
        if isinstance(acceleration, PSOConstantAcceleration):
            return commol_rs.calibration.PSOAccelerationConstant(
                cognitive=acceleration.cognitive,
                social=acceleration.social,
            )
        if isinstance(acceleration, PSOTimeVaryingAcceleration):
            return commol_rs.calibration.PSOAccelerationTimeVarying(
                c1_initial=acceleration.c1_initial,
                c1_final=acceleration.c1_final,
                c2_initial=acceleration.c2_initial,
                c2_final=acceleration.c2_final,
            )
        return None

    def _get_initial_population_size(self) -> int:
        """Get the initial population size from the model."""
        initial_conditions = (
            self.simulation.model_definition.population.initial_conditions
        )
        return initial_conditions.population_size

Functions

run_multiple

run_multiple(n_runs: int) -> list[CalibrationResultWithHistoryProtocol]

Run multiple calibration attempts in parallel using Rust.

Parameters:

Name Type Description Default
n_runs int

Number of calibration runs to perform.

required

Returns:

Type Description
list[CalibrationResultWithHistoryProtocol]

List of calibration results with evaluation history.

Raises:

Type Description
RuntimeError

If all calibration runs fail.

Source code in commol/api/probabilistic/calibration_runner.py
def run_multiple(
    self,
    n_runs: int,
) -> list["CalibrationResultWithHistoryProtocol"]:
    """Run multiple calibration attempts in parallel using Rust.

    Parameters
    ----------
    n_runs : int
        Number of calibration runs to perform.

    Returns
    -------
    list[CalibrationResultWithHistoryProtocol]
        List of calibration results with evaluation history.

    Raises
    ------
    RuntimeError
        If all calibration runs fail.
    """
    logger.info(f"Running {n_runs} calibrations in parallel")

    # Convert observed data to Rust types
    rust_observed_data = [
        commol_rs.calibration.ObservedDataPoint(
            step=point.step,
            compartment=point.compartment,
            value=point.value,
            weight=point.weight,
            scale_id=point.scale_id,
        )
        for point in self.problem.observed_data
    ]

    # Convert parameters to Rust types
    rust_parameters = [
        commol_rs.calibration.CalibrationParameter(
            id=param.id,
            parameter_type=self._to_rust_parameter_type(param.parameter_type),
            min_bound=param.min_bound,
            max_bound=param.max_bound,
            initial_guess=param.initial_guess,
        )
        for param in self.problem.parameters
    ]

    # Convert constraints to Rust types
    rust_constraints = [
        commol_rs.calibration.CalibrationConstraint(
            id=constraint.id,
            expression=constraint.expression,
            description=constraint.description,
            weight=constraint.weight,
            time_steps=constraint.time_steps,
        )
        for constraint in self.problem.constraints
    ]

    # Convert loss and optimization configs
    rust_loss_config = self._build_loss_config()
    rust_optimization_config = self._build_optimization_config()

    # Get initial population size
    initial_population_size = self._get_initial_population_size()

    # Call Rust function for parallel execution
    try:
        results = commol_rs.calibration.run_multiple_calibrations(
            self.simulation.engine,
            rust_observed_data,
            rust_parameters,
            rust_constraints,
            rust_loss_config,
            rust_optimization_config,
            initial_population_size,
            n_runs,
            self.seed,
        )
        logger.info(f"Completed {len(results)}/{n_runs} calibrations successfully")
        return results
    except Exception as e:
        raise RuntimeError(f"Parallel calibrations failed: {e}") from e

options: show_root_heading: true show_source: false heading_level: 3 members: - run_multiple

EvaluationProcessor

Handles deduplication, filtering, and clustering of calibration evaluations.

EvaluationProcessor

Handles processing of calibration evaluations.

This class is responsible for: - Collecting evaluations from calibration results - Deduplicating similar evaluations - Filtering evaluations by loss percentile - Clustering evaluations using K-means - Selecting cluster representatives

Parameters:

Name Type Description Default
deduplication_tolerance float

Tolerance for parameter deduplication (default: 1e-6)

1e-06
seed int

Random seed for reproducibility (required, must be 32-bit for sklearn compatibility)

required
min_evaluations_for_clustering int

Minimum number of evaluations required for clustering analysis

10
identical_solutions_atol float

Absolute tolerance for checking if all solutions are identical

1e-10
silhouette_threshold float

Silhouette score threshold for determining if clustering is beneficial

0.2
silhouette_excellent_threshold float

Early stopping threshold for silhouette score search

0.7
kmeans_max_iter int

Maximum iterations for K-means clustering

100
kmeans_algorithm str

K-means algorithm to use

'elkan'
Source code in commol/api/probabilistic/evaluation_processor.py
class EvaluationProcessor:
    """Handles processing of calibration evaluations.

    This class is responsible for:
    - Collecting evaluations from calibration results
    - Deduplicating similar evaluations
    - Filtering evaluations by loss percentile
    - Clustering evaluations using K-means
    - Selecting cluster representatives

    Parameters
    ----------
    deduplication_tolerance : float
        Tolerance for parameter deduplication (default: 1e-6)
    seed : int
        Random seed for reproducibility (required, must be 32-bit for sklearn
        compatibility)
    min_evaluations_for_clustering : int
        Minimum number of evaluations required for clustering analysis
    identical_solutions_atol : float
        Absolute tolerance for checking if all solutions are identical
    silhouette_threshold : float
        Silhouette score threshold for determining if clustering is beneficial
    silhouette_excellent_threshold : float
        Early stopping threshold for silhouette score search
    kmeans_max_iter : int
        Maximum iterations for K-means clustering
    kmeans_algorithm : str
        K-means algorithm to use
    """

    def __init__(
        self,
        seed: int,
        deduplication_tolerance: float = 1e-6,
        min_evaluations_for_clustering: int = 10,
        identical_solutions_atol: float = 1e-10,
        silhouette_threshold: float = 0.2,
        silhouette_excellent_threshold: float = 0.7,
        kmeans_max_iter: int = 100,
        kmeans_algorithm: str = "elkan",
    ):
        self.deduplication_tolerance = deduplication_tolerance
        self.seed = seed
        self.min_evaluations_for_clustering = min_evaluations_for_clustering
        self.identical_solutions_atol = identical_solutions_atol
        self.silhouette_threshold = silhouette_threshold
        self.silhouette_excellent_threshold = silhouette_excellent_threshold
        self.kmeans_max_iter = kmeans_max_iter
        self.kmeans_algorithm = kmeans_algorithm

    def collect_evaluations(
        self, results: list["CalibrationResultWithHistoryProtocol"]
    ) -> list[CalibrationEvaluation]:
        """Collect all parameter evaluations from calibration results.

        Parameters
        ----------
        results : list[CalibrationResultWithHistoryProtocol]
            List of calibration results with evaluation history

        Returns
        -------
        list[CalibrationEvaluation]
            List of all evaluations collected from the results
        """
        evaluations: list[CalibrationEvaluation] = []

        for idx, result in enumerate(results):
            # Collect ALL evaluations from this run, not just the best one
            # This gives us a diverse set of parameter combinations explored during
            # optimization
            if hasattr(result, "evaluations") and len(result.evaluations) > 0:
                for eval_obj in result.evaluations:
                    evaluations.append(
                        CalibrationEvaluation(
                            parameters=list(eval_obj.parameters),
                            loss=eval_obj.loss,
                            parameter_names=list(result.parameter_names),
                        )
                    )
                logger.debug(
                    f"Run {idx + 1}: collected {len(result.evaluations)} evaluations, "
                    f"best loss={result.final_loss:.6f}"
                )
            else:
                # Fallback: if no evaluations history, just use the best result
                evaluations.append(
                    CalibrationEvaluation(
                        parameters=list(result.best_parameters.values()),
                        loss=result.final_loss,
                        parameter_names=list(result.best_parameters.keys()),
                    )
                )
                logger.debug(
                    f"Run {idx + 1}: no evaluation history, using best only: "
                    f"loss={result.final_loss:.6f}"
                )

        return evaluations

    def deduplicate(
        self, evaluations: list[CalibrationEvaluation]
    ) -> list[CalibrationEvaluation]:
        """Remove duplicate evaluations based on parameter similarity using Rust.

        Parameters
        ----------
        evaluations : list[CalibrationEvaluation]
            List of evaluations to deduplicate

        Returns
        -------
        list[CalibrationEvaluation]
            List of unique evaluations
        """
        if not evaluations:
            return []

        # Convert to Rust CalibrationEvaluation objects
        rust_evaluations = [
            commol_rs.calibration.CalibrationEvaluation(
                parameters=e.parameters,
                loss=e.loss,
                predictions=e.predictions or [],
            )
            for e in evaluations
        ]

        # Call Rust deduplication (O(n) average case using spatial hashing)
        unique_rust = commol_rs.calibration.deduplicate_evaluations(
            rust_evaluations, self.deduplication_tolerance
        )

        # Convert back to Python dataclass
        unique: list[CalibrationEvaluation] = []
        param_names = evaluations[0].parameter_names
        for eval_obj in unique_rust:
            unique.append(
                CalibrationEvaluation(
                    parameters=list(eval_obj.parameters),
                    loss=eval_obj.loss,
                    parameter_names=param_names,
                    predictions=list(eval_obj.predictions)
                    if eval_obj.predictions
                    else None,
                )
            )

        return unique

    def filter_by_loss_percentile(
        self,
        evaluations: list[CalibrationEvaluation],
        percentile: float,
    ) -> list[CalibrationEvaluation]:
        """Filter evaluations to keep only the best N% by loss.

        Parameters
        ----------
        evaluations : list[CalibrationEvaluation]
            List of evaluations to filter
        percentile : float
            Fraction (0.0 - 1.0] of best solutions to keep

        Returns
        -------
        list[CalibrationEvaluation]
            Filtered list containing only the best solutions by loss
        """
        if not evaluations or percentile >= 1.0:
            return evaluations

        # Sort by loss (ascending - lower is better)
        sorted_evaluations = sorted(evaluations, key=lambda e: e.loss)

        # Calculate how many to keep
        n_to_keep = max(1, int(len(sorted_evaluations) * percentile))

        # Return the best N%
        return sorted_evaluations[:n_to_keep]

    def find_optimal_k(self, evaluations: list[CalibrationEvaluation]) -> int:
        """Automatically determine optimal number of clusters using silhouette analysis.

        Returns 1 if there's no clear clustering structure (all solutions are similar),
        otherwise returns the optimal K based on silhouette scores.

        Parameters
        ----------
        evaluations : list[CalibrationEvaluation]
            List of evaluations to analyze

        Returns
        -------
        int
            Optimal number of clusters (1 if no clear structure)
        """
        n_evaluations = len(evaluations)

        # If we have very few evaluations, no need to cluster
        if n_evaluations < self.min_evaluations_for_clustering:
            logger.info(
                f"Too few evaluations ({n_evaluations}) for clustering, "
                "using single cluster"
            )
            return 1

        # Extract parameter vectors
        param_vectors = np.array([e.parameters for e in evaluations])

        # Check if all solutions are essentially identical (no variance)
        if np.allclose(
            param_vectors.std(axis=0), 0, atol=self.identical_solutions_atol
        ):
            logger.info("All solutions are identical, using single cluster")
            return 1

        # Determine range for K to test
        # Use a more conservative upper bound to reduce computation
        # Test K values up to sqrt(n)/2 or 10, whichever is smaller
        min_k = 2
        max_k = min(max(2, int(np.sqrt(n_evaluations)) // 2), 10)

        # If we can't test multiple K values, return 1 cluster
        if min_k > max_k or n_evaluations < 2 * min_k:
            logger.info(
                f"Not enough evaluations to test clustering (n={n_evaluations}), "
                "using single cluster"
            )
            return 1

        # Calculate silhouette scores for different K values
        silhouette_scores: dict[int, float] = {}
        k_range = range(min_k, max_k + 1)

        for k in k_range:
            try:
                kmeans = KMeans(
                    n_clusters=k,
                    random_state=self.seed,
                    n_init="auto",
                    max_iter=self.kmeans_max_iter,
                    algorithm=self.kmeans_algorithm,
                )
                labels = kmeans.fit_predict(param_vectors)

                # Silhouette score requires at least 2 clusters with samples
                if len(np.unique(labels)) < 2:
                    silhouette_scores[k] = -1.0
                else:
                    score = silhouette_score(param_vectors, labels)
                    silhouette_scores[k] = score

                    # Early stopping if excellent clustering found
                    if score > self.silhouette_excellent_threshold:
                        logger.info(
                            f"Found excellent clustering at k={k} "
                            f"(silhouette={score:.3f}), stopping search"
                        )
                        break

            except Exception as e:
                logger.warning(f"Failed to compute silhouette score for k={k}: {e}")
                silhouette_scores[k] = -1.0

        # Get the best silhouette score and corresponding K
        if not silhouette_scores:
            logger.warning("No valid silhouette scores computed, using single cluster")
            return 1

        best_k = max(silhouette_scores.items(), key=lambda x: x[1])
        optimal_k, best_score = best_k

        if best_score < self.silhouette_threshold:
            logger.info(
                f"Best silhouette score ({best_score:.3f}) below threshold "
                f"({self.silhouette_threshold}), indicating no clear clustering "
                "structure. Using single cluster."
            )
            return 1

        logger.info(
            f"Found {optimal_k} clusters with silhouette score {best_score:.3f}"
        )

        return optimal_k

    def cluster_evaluations(
        self,
        evaluations: list[CalibrationEvaluation],
        k: int,
    ) -> list[int]:
        """Cluster evaluations using K-means.

        If k=1, all evaluations are assigned to a single cluster.

        Parameters
        ----------
        evaluations : list[CalibrationEvaluation]
            List of evaluations to cluster
        k : int
            Number of clusters

        Returns
        -------
        list[int]
            List of cluster labels (one per evaluation)
        """
        if k == 1:
            # Single cluster - no need for K-means
            logger.info("Single cluster: all evaluations grouped together")
            return [0] * len(evaluations)

        param_vectors = np.array([e.parameters for e in evaluations])

        kmeans = KMeans(n_clusters=k, random_state=self.seed, n_init="auto")
        labels = kmeans.fit_predict(param_vectors)

        return list(labels)

    def select_representatives(
        self,
        evaluations: list[CalibrationEvaluation],
        cluster_labels: list[int],
        max_representatives: int,
        elite_fraction: float,
        strategy: str,
        selection_method: str,
        quality_temperature: float,
        k_neighbors_min: int,
        k_neighbors_max: int,
        sparsity_weight: float,
        stratum_fit_weight: float,
    ) -> list[int]:
        """Select representative evaluations from clusters using Rust.

        Parameters
        ----------
        evaluations : list[CalibrationEvaluation]
            List of all evaluations
        cluster_labels : list[int]
            Cluster assignment for each evaluation
        max_representatives : int
            Maximum total representatives to select
        elite_fraction : float
            Fraction of best solutions to always include (0.0-1.0)
        strategy : str
            Distribution strategy ("proportional" or "equal")
        selection_method : str
            Diversity method ("crowding_distance", "maximin_distance", or
            "latin_hypercube")
        quality_temperature : float
            Temperature for quality weighting in maximin method
        k_neighbors_min : int
            Minimum k for k-nearest neighbors density estimation
        k_neighbors_max : int
            Maximum k for k-nearest neighbors density estimation
        sparsity_weight : float
            Exponential weight for sparsity in density-aware selection
        stratum_fit_weight : float
            Weight for stratum fit quality vs diversity in Latin hypercube

        Returns
        -------
        list[int]
            Indices of selected representative evaluations
        """
        # Convert to Rust types
        rust_evaluations = [
            commol_rs.calibration.CalibrationEvaluation(
                parameters=e.parameters,
                loss=e.loss,
                predictions=e.predictions or [],
            )
            for e in evaluations
        ]

        return commol_rs.calibration.select_cluster_representatives(
            rust_evaluations,
            cluster_labels,
            max_representatives,
            elite_fraction,
            strategy,
            selection_method,
            quality_temperature,
            self.seed,
            k_neighbors_min,
            k_neighbors_max,
            sparsity_weight,
            stratum_fit_weight,
        )

Functions

collect_evaluations

collect_evaluations(results: list[CalibrationResultWithHistoryProtocol]) -> list[CalibrationEvaluation]

Collect all parameter evaluations from calibration results.

Parameters:

Name Type Description Default
results list[CalibrationResultWithHistoryProtocol]

List of calibration results with evaluation history

required

Returns:

Type Description
list[CalibrationEvaluation]

List of all evaluations collected from the results

Source code in commol/api/probabilistic/evaluation_processor.py
def collect_evaluations(
    self, results: list["CalibrationResultWithHistoryProtocol"]
) -> list[CalibrationEvaluation]:
    """Collect all parameter evaluations from calibration results.

    Parameters
    ----------
    results : list[CalibrationResultWithHistoryProtocol]
        List of calibration results with evaluation history

    Returns
    -------
    list[CalibrationEvaluation]
        List of all evaluations collected from the results
    """
    evaluations: list[CalibrationEvaluation] = []

    for idx, result in enumerate(results):
        # Collect ALL evaluations from this run, not just the best one
        # This gives us a diverse set of parameter combinations explored during
        # optimization
        if hasattr(result, "evaluations") and len(result.evaluations) > 0:
            for eval_obj in result.evaluations:
                evaluations.append(
                    CalibrationEvaluation(
                        parameters=list(eval_obj.parameters),
                        loss=eval_obj.loss,
                        parameter_names=list(result.parameter_names),
                    )
                )
            logger.debug(
                f"Run {idx + 1}: collected {len(result.evaluations)} evaluations, "
                f"best loss={result.final_loss:.6f}"
            )
        else:
            # Fallback: if no evaluations history, just use the best result
            evaluations.append(
                CalibrationEvaluation(
                    parameters=list(result.best_parameters.values()),
                    loss=result.final_loss,
                    parameter_names=list(result.best_parameters.keys()),
                )
            )
            logger.debug(
                f"Run {idx + 1}: no evaluation history, using best only: "
                f"loss={result.final_loss:.6f}"
            )

    return evaluations

deduplicate

deduplicate(evaluations: list[CalibrationEvaluation]) -> list[CalibrationEvaluation]

Remove duplicate evaluations based on parameter similarity using Rust.

Parameters:

Name Type Description Default
evaluations list[CalibrationEvaluation]

List of evaluations to deduplicate

required

Returns:

Type Description
list[CalibrationEvaluation]

List of unique evaluations

Source code in commol/api/probabilistic/evaluation_processor.py
def deduplicate(
    self, evaluations: list[CalibrationEvaluation]
) -> list[CalibrationEvaluation]:
    """Remove duplicate evaluations based on parameter similarity using Rust.

    Parameters
    ----------
    evaluations : list[CalibrationEvaluation]
        List of evaluations to deduplicate

    Returns
    -------
    list[CalibrationEvaluation]
        List of unique evaluations
    """
    if not evaluations:
        return []

    # Convert to Rust CalibrationEvaluation objects
    rust_evaluations = [
        commol_rs.calibration.CalibrationEvaluation(
            parameters=e.parameters,
            loss=e.loss,
            predictions=e.predictions or [],
        )
        for e in evaluations
    ]

    # Call Rust deduplication (O(n) average case using spatial hashing)
    unique_rust = commol_rs.calibration.deduplicate_evaluations(
        rust_evaluations, self.deduplication_tolerance
    )

    # Convert back to Python dataclass
    unique: list[CalibrationEvaluation] = []
    param_names = evaluations[0].parameter_names
    for eval_obj in unique_rust:
        unique.append(
            CalibrationEvaluation(
                parameters=list(eval_obj.parameters),
                loss=eval_obj.loss,
                parameter_names=param_names,
                predictions=list(eval_obj.predictions)
                if eval_obj.predictions
                else None,
            )
        )

    return unique

filter_by_loss_percentile

filter_by_loss_percentile(evaluations: list[CalibrationEvaluation], percentile: float) -> list[CalibrationEvaluation]

Filter evaluations to keep only the best N% by loss.

Parameters:

Name Type Description Default
evaluations list[CalibrationEvaluation]

List of evaluations to filter

required
percentile float

Fraction (0.0 - 1.0] of best solutions to keep

required

Returns:

Type Description
list[CalibrationEvaluation]

Filtered list containing only the best solutions by loss

Source code in commol/api/probabilistic/evaluation_processor.py
def filter_by_loss_percentile(
    self,
    evaluations: list[CalibrationEvaluation],
    percentile: float,
) -> list[CalibrationEvaluation]:
    """Filter evaluations to keep only the best N% by loss.

    Parameters
    ----------
    evaluations : list[CalibrationEvaluation]
        List of evaluations to filter
    percentile : float
        Fraction (0.0 - 1.0] of best solutions to keep

    Returns
    -------
    list[CalibrationEvaluation]
        Filtered list containing only the best solutions by loss
    """
    if not evaluations or percentile >= 1.0:
        return evaluations

    # Sort by loss (ascending - lower is better)
    sorted_evaluations = sorted(evaluations, key=lambda e: e.loss)

    # Calculate how many to keep
    n_to_keep = max(1, int(len(sorted_evaluations) * percentile))

    # Return the best N%
    return sorted_evaluations[:n_to_keep]

find_optimal_k

find_optimal_k(evaluations: list[CalibrationEvaluation]) -> int

Automatically determine optimal number of clusters using silhouette analysis.

Returns 1 if there's no clear clustering structure (all solutions are similar), otherwise returns the optimal K based on silhouette scores.

Parameters:

Name Type Description Default
evaluations list[CalibrationEvaluation]

List of evaluations to analyze

required

Returns:

Type Description
int

Optimal number of clusters (1 if no clear structure)

Source code in commol/api/probabilistic/evaluation_processor.py
def find_optimal_k(self, evaluations: list[CalibrationEvaluation]) -> int:
    """Automatically determine optimal number of clusters using silhouette analysis.

    Returns 1 if there's no clear clustering structure (all solutions are similar),
    otherwise returns the optimal K based on silhouette scores.

    Parameters
    ----------
    evaluations : list[CalibrationEvaluation]
        List of evaluations to analyze

    Returns
    -------
    int
        Optimal number of clusters (1 if no clear structure)
    """
    n_evaluations = len(evaluations)

    # If we have very few evaluations, no need to cluster
    if n_evaluations < self.min_evaluations_for_clustering:
        logger.info(
            f"Too few evaluations ({n_evaluations}) for clustering, "
            "using single cluster"
        )
        return 1

    # Extract parameter vectors
    param_vectors = np.array([e.parameters for e in evaluations])

    # Check if all solutions are essentially identical (no variance)
    if np.allclose(
        param_vectors.std(axis=0), 0, atol=self.identical_solutions_atol
    ):
        logger.info("All solutions are identical, using single cluster")
        return 1

    # Determine range for K to test
    # Use a more conservative upper bound to reduce computation
    # Test K values up to sqrt(n)/2 or 10, whichever is smaller
    min_k = 2
    max_k = min(max(2, int(np.sqrt(n_evaluations)) // 2), 10)

    # If we can't test multiple K values, return 1 cluster
    if min_k > max_k or n_evaluations < 2 * min_k:
        logger.info(
            f"Not enough evaluations to test clustering (n={n_evaluations}), "
            "using single cluster"
        )
        return 1

    # Calculate silhouette scores for different K values
    silhouette_scores: dict[int, float] = {}
    k_range = range(min_k, max_k + 1)

    for k in k_range:
        try:
            kmeans = KMeans(
                n_clusters=k,
                random_state=self.seed,
                n_init="auto",
                max_iter=self.kmeans_max_iter,
                algorithm=self.kmeans_algorithm,
            )
            labels = kmeans.fit_predict(param_vectors)

            # Silhouette score requires at least 2 clusters with samples
            if len(np.unique(labels)) < 2:
                silhouette_scores[k] = -1.0
            else:
                score = silhouette_score(param_vectors, labels)
                silhouette_scores[k] = score

                # Early stopping if excellent clustering found
                if score > self.silhouette_excellent_threshold:
                    logger.info(
                        f"Found excellent clustering at k={k} "
                        f"(silhouette={score:.3f}), stopping search"
                    )
                    break

        except Exception as e:
            logger.warning(f"Failed to compute silhouette score for k={k}: {e}")
            silhouette_scores[k] = -1.0

    # Get the best silhouette score and corresponding K
    if not silhouette_scores:
        logger.warning("No valid silhouette scores computed, using single cluster")
        return 1

    best_k = max(silhouette_scores.items(), key=lambda x: x[1])
    optimal_k, best_score = best_k

    if best_score < self.silhouette_threshold:
        logger.info(
            f"Best silhouette score ({best_score:.3f}) below threshold "
            f"({self.silhouette_threshold}), indicating no clear clustering "
            "structure. Using single cluster."
        )
        return 1

    logger.info(
        f"Found {optimal_k} clusters with silhouette score {best_score:.3f}"
    )

    return optimal_k

cluster_evaluations

cluster_evaluations(evaluations: list[CalibrationEvaluation], k: int) -> list[int]

Cluster evaluations using K-means.

If k=1, all evaluations are assigned to a single cluster.

Parameters:

Name Type Description Default
evaluations list[CalibrationEvaluation]

List of evaluations to cluster

required
k int

Number of clusters

required

Returns:

Type Description
list[int]

List of cluster labels (one per evaluation)

Source code in commol/api/probabilistic/evaluation_processor.py
def cluster_evaluations(
    self,
    evaluations: list[CalibrationEvaluation],
    k: int,
) -> list[int]:
    """Cluster evaluations using K-means.

    If k=1, all evaluations are assigned to a single cluster.

    Parameters
    ----------
    evaluations : list[CalibrationEvaluation]
        List of evaluations to cluster
    k : int
        Number of clusters

    Returns
    -------
    list[int]
        List of cluster labels (one per evaluation)
    """
    if k == 1:
        # Single cluster - no need for K-means
        logger.info("Single cluster: all evaluations grouped together")
        return [0] * len(evaluations)

    param_vectors = np.array([e.parameters for e in evaluations])

    kmeans = KMeans(n_clusters=k, random_state=self.seed, n_init="auto")
    labels = kmeans.fit_predict(param_vectors)

    return list(labels)

select_representatives

select_representatives(evaluations: list[CalibrationEvaluation], cluster_labels: list[int], max_representatives: int, elite_fraction: float, strategy: str, selection_method: str, quality_temperature: float, k_neighbors_min: int, k_neighbors_max: int, sparsity_weight: float, stratum_fit_weight: float) -> list[int]

Select representative evaluations from clusters using Rust.

Parameters:

Name Type Description Default
evaluations list[CalibrationEvaluation]

List of all evaluations

required
cluster_labels list[int]

Cluster assignment for each evaluation

required
max_representatives int

Maximum total representatives to select

required
elite_fraction float

Fraction of best solutions to always include (0.0-1.0)

required
strategy str

Distribution strategy ("proportional" or "equal")

required
selection_method str

Diversity method ("crowding_distance", "maximin_distance", or "latin_hypercube")

required
quality_temperature float

Temperature for quality weighting in maximin method

required
k_neighbors_min int

Minimum k for k-nearest neighbors density estimation

required
k_neighbors_max int

Maximum k for k-nearest neighbors density estimation

required
sparsity_weight float

Exponential weight for sparsity in density-aware selection

required
stratum_fit_weight float

Weight for stratum fit quality vs diversity in Latin hypercube

required

Returns:

Type Description
list[int]

Indices of selected representative evaluations

Source code in commol/api/probabilistic/evaluation_processor.py
def select_representatives(
    self,
    evaluations: list[CalibrationEvaluation],
    cluster_labels: list[int],
    max_representatives: int,
    elite_fraction: float,
    strategy: str,
    selection_method: str,
    quality_temperature: float,
    k_neighbors_min: int,
    k_neighbors_max: int,
    sparsity_weight: float,
    stratum_fit_weight: float,
) -> list[int]:
    """Select representative evaluations from clusters using Rust.

    Parameters
    ----------
    evaluations : list[CalibrationEvaluation]
        List of all evaluations
    cluster_labels : list[int]
        Cluster assignment for each evaluation
    max_representatives : int
        Maximum total representatives to select
    elite_fraction : float
        Fraction of best solutions to always include (0.0-1.0)
    strategy : str
        Distribution strategy ("proportional" or "equal")
    selection_method : str
        Diversity method ("crowding_distance", "maximin_distance", or
        "latin_hypercube")
    quality_temperature : float
        Temperature for quality weighting in maximin method
    k_neighbors_min : int
        Minimum k for k-nearest neighbors density estimation
    k_neighbors_max : int
        Maximum k for k-nearest neighbors density estimation
    sparsity_weight : float
        Exponential weight for sparsity in density-aware selection
    stratum_fit_weight : float
        Weight for stratum fit quality vs diversity in Latin hypercube

    Returns
    -------
    list[int]
        Indices of selected representative evaluations
    """
    # Convert to Rust types
    rust_evaluations = [
        commol_rs.calibration.CalibrationEvaluation(
            parameters=e.parameters,
            loss=e.loss,
            predictions=e.predictions or [],
        )
        for e in evaluations
    ]

    return commol_rs.calibration.select_cluster_representatives(
        rust_evaluations,
        cluster_labels,
        max_representatives,
        elite_fraction,
        strategy,
        selection_method,
        quality_temperature,
        self.seed,
        k_neighbors_min,
        k_neighbors_max,
        sparsity_weight,
        stratum_fit_weight,
    )

options: show_root_heading: true show_source: false heading_level: 3 members: - collect_evaluations - deduplicate - filter_by_loss_percentile - find_optimal_k - cluster_evaluations - select_representatives

EnsembleSelector

Runs NSGA-II multi-objective optimization for ensemble selection.

EnsembleSelector

Handles NSGA-II ensemble selection.

This class is responsible for: - Generating predictions for candidate parameter sets - Running NSGA-II multi-objective optimization - Selecting Pareto-optimal ensemble

Parameters:

Name Type Description Default
simulation Simulation

A fully initialized Simulation object

required
problem CalibrationProblem

The calibration problem definition

required
seed int

Random seed for reproducibility in NSGA-II optimization

required
Source code in commol/api/probabilistic/ensemble_selector.py
class EnsembleSelector:
    """Handles NSGA-II ensemble selection.

    This class is responsible for:
    - Generating predictions for candidate parameter sets
    - Running NSGA-II multi-objective optimization
    - Selecting Pareto-optimal ensemble

    Parameters
    ----------
    simulation : Simulation
        A fully initialized Simulation object
    problem : CalibrationProblem
        The calibration problem definition
    seed : int
        Random seed for reproducibility in NSGA-II optimization
    """

    def __init__(
        self,
        simulation: "Simulation",
        problem: "CalibrationProblem",
        seed: int,
    ):
        self.simulation = simulation
        self.problem = problem
        self.seed = seed
        self._compartment_name_to_idx = {
            bin.id: idx
            for idx, bin in enumerate(simulation.model_definition.population.bins)
        }

    def select_ensemble(
        self,
        representatives: list[CalibrationEvaluation],
        nsga_population_size: int,
        nsga_generations: int,
        confidence_level: float,
        pareto_preference: float,
        ensemble_size_mode: str,
        ensemble_size: int | None,
        ensemble_size_min: int | None,
        ensemble_size_max: int | None,
        ci_margin_factor: float,
        ci_sample_sizes: list[int],
        nsga_crossover_probability: float,
    ) -> "EnsembleSelectionResultProtocol":
        """Run NSGA-II ensemble selection.

        Parameters
        ----------
        representatives : list[CalibrationEvaluation]
            Candidate parameter sets for ensemble selection
        nsga_population_size : int
            NSGA-II population size
        nsga_generations : int
            Number of NSGA-II generations
        confidence_level : float
            Confidence level for CI calculation (e.g., 0.95)
        pareto_preference : float
            Preference for Pareto front selection (0.0-1.0)
        ensemble_size_mode : str
            Mode for determining ensemble size ("fixed", "bounded", "automatic")
        ensemble_size : int | None
            Fixed ensemble size (required if mode='fixed')
        ensemble_size_min : int | None
            Minimum ensemble size (required if mode='bounded')
        ensemble_size_max : int | None
            Maximum ensemble size (required if mode='bounded')
        ci_margin_factor : float
            Margin factor for CI bounds estimation (e.g., 0.1 = 10% margin)
        ci_sample_sizes : list[int]
            Sample sizes used for CI bounds estimation
        nsga_crossover_probability : float
            Crossover probability for NSGA-II (0.0-1.0)

        Returns
        -------
        object
            Rust EnsembleSelectionResult object
        """

        logger.info(
            f"Running NSGA-II ensemble selection on {len(representatives)} candidates"
        )

        # Generate predictions for each representative in parallel
        representatives_with_predictions = self._generate_predictions(representatives)

        # Convert to Rust CalibrationEvaluation objects
        candidates = [
            commol_rs.calibration.CalibrationEvaluation(
                parameters=rep.parameters,
                loss=rep.loss,
                predictions=rep.predictions or [],
            )
            for rep in representatives_with_predictions
        ]

        # Prepare observed data tuples
        observed_data_tuples = [
            (
                obs.step,
                self._compartment_name_to_idx[obs.compartment],
                obs.value,
            )
            for obs in self.problem.observed_data
        ]

        # Run NSGA-II ensemble selection
        logger.info("Running NSGA-II ensemble selection...")
        ensemble_result = commol_rs.calibration.select_optimal_ensemble(
            candidates=candidates,
            observed_data_tuples=observed_data_tuples,
            population_size=nsga_population_size,
            generations=nsga_generations,
            confidence_level=confidence_level,
            seed=self.seed,
            pareto_preference=pareto_preference,
            ensemble_size_mode=ensemble_size_mode,
            ensemble_size=ensemble_size,
            ensemble_size_min=ensemble_size_min,
            ensemble_size_max=ensemble_size_max,
            ci_margin_factor=ci_margin_factor,
            ci_sample_sizes=ci_sample_sizes,
            nsga_crossover_probability=nsga_crossover_probability,
        )

        logger.info(
            f"Selected ensemble of {len(ensemble_result.selected_ensemble)} parameter "
            "sets using NSGA-II"
        )
        logger.info(
            f"Pareto front contains {len(ensemble_result.pareto_front)} solutions"
        )

        return ensemble_result

    def _generate_predictions(
        self,
        representatives: list[CalibrationEvaluation],
    ) -> list[CalibrationEvaluation]:
        """Generate predictions for representative parameter sets in parallel.

        Parameters
        ----------
        representatives : list[CalibrationEvaluation]
            List of parameter sets to generate predictions for

        Returns
        -------
        list[CalibrationEvaluation]
            Representatives with predictions attached
        """
        logger.info(
            "Generating predictions for representative parameter sets in parallel..."
        )

        max_time_step = max(obs.step for obs in self.problem.observed_data)
        time_steps = max_time_step + 1

        # Extract parameter sets and names
        parameter_sets = [rep.parameters for rep in representatives]
        parameter_names = representatives[0].parameter_names

        # Call Rust function to generate all predictions in parallel
        all_predictions = commol_rs.calibration.generate_predictions_parallel(
            self.simulation.engine,
            parameter_sets,
            parameter_names,
            time_steps,
        )

        # Combine predictions with representative data
        result: list[CalibrationEvaluation] = []
        for rep, predictions in zip(representatives, all_predictions):
            result.append(
                CalibrationEvaluation(
                    parameters=rep.parameters,
                    loss=rep.loss,
                    parameter_names=rep.parameter_names,
                    predictions=predictions,
                )
            )

        return result

Functions

select_ensemble

select_ensemble(representatives: list[CalibrationEvaluation], nsga_population_size: int, nsga_generations: int, confidence_level: float, pareto_preference: float, ensemble_size_mode: str, ensemble_size: int | None, ensemble_size_min: int | None, ensemble_size_max: int | None, ci_margin_factor: float, ci_sample_sizes: list[int], nsga_crossover_probability: float) -> EnsembleSelectionResultProtocol

Run NSGA-II ensemble selection.

Parameters:

Name Type Description Default
representatives list[CalibrationEvaluation]

Candidate parameter sets for ensemble selection

required
nsga_population_size int

NSGA-II population size

required
nsga_generations int

Number of NSGA-II generations

required
confidence_level float

Confidence level for CI calculation (e.g., 0.95)

required
pareto_preference float

Preference for Pareto front selection (0.0-1.0)

required
ensemble_size_mode str

Mode for determining ensemble size ("fixed", "bounded", "automatic")

required
ensemble_size int | None

Fixed ensemble size (required if mode='fixed')

required
ensemble_size_min int | None

Minimum ensemble size (required if mode='bounded')

required
ensemble_size_max int | None

Maximum ensemble size (required if mode='bounded')

required
ci_margin_factor float

Margin factor for CI bounds estimation (e.g., 0.1 = 10% margin)

required
ci_sample_sizes list[int]

Sample sizes used for CI bounds estimation

required
nsga_crossover_probability float

Crossover probability for NSGA-II (0.0-1.0)

required

Returns:

Type Description
object

Rust EnsembleSelectionResult object

Source code in commol/api/probabilistic/ensemble_selector.py
def select_ensemble(
    self,
    representatives: list[CalibrationEvaluation],
    nsga_population_size: int,
    nsga_generations: int,
    confidence_level: float,
    pareto_preference: float,
    ensemble_size_mode: str,
    ensemble_size: int | None,
    ensemble_size_min: int | None,
    ensemble_size_max: int | None,
    ci_margin_factor: float,
    ci_sample_sizes: list[int],
    nsga_crossover_probability: float,
) -> "EnsembleSelectionResultProtocol":
    """Run NSGA-II ensemble selection.

    Parameters
    ----------
    representatives : list[CalibrationEvaluation]
        Candidate parameter sets for ensemble selection
    nsga_population_size : int
        NSGA-II population size
    nsga_generations : int
        Number of NSGA-II generations
    confidence_level : float
        Confidence level for CI calculation (e.g., 0.95)
    pareto_preference : float
        Preference for Pareto front selection (0.0-1.0)
    ensemble_size_mode : str
        Mode for determining ensemble size ("fixed", "bounded", "automatic")
    ensemble_size : int | None
        Fixed ensemble size (required if mode='fixed')
    ensemble_size_min : int | None
        Minimum ensemble size (required if mode='bounded')
    ensemble_size_max : int | None
        Maximum ensemble size (required if mode='bounded')
    ci_margin_factor : float
        Margin factor for CI bounds estimation (e.g., 0.1 = 10% margin)
    ci_sample_sizes : list[int]
        Sample sizes used for CI bounds estimation
    nsga_crossover_probability : float
        Crossover probability for NSGA-II (0.0-1.0)

    Returns
    -------
    object
        Rust EnsembleSelectionResult object
    """

    logger.info(
        f"Running NSGA-II ensemble selection on {len(representatives)} candidates"
    )

    # Generate predictions for each representative in parallel
    representatives_with_predictions = self._generate_predictions(representatives)

    # Convert to Rust CalibrationEvaluation objects
    candidates = [
        commol_rs.calibration.CalibrationEvaluation(
            parameters=rep.parameters,
            loss=rep.loss,
            predictions=rep.predictions or [],
        )
        for rep in representatives_with_predictions
    ]

    # Prepare observed data tuples
    observed_data_tuples = [
        (
            obs.step,
            self._compartment_name_to_idx[obs.compartment],
            obs.value,
        )
        for obs in self.problem.observed_data
    ]

    # Run NSGA-II ensemble selection
    logger.info("Running NSGA-II ensemble selection...")
    ensemble_result = commol_rs.calibration.select_optimal_ensemble(
        candidates=candidates,
        observed_data_tuples=observed_data_tuples,
        population_size=nsga_population_size,
        generations=nsga_generations,
        confidence_level=confidence_level,
        seed=self.seed,
        pareto_preference=pareto_preference,
        ensemble_size_mode=ensemble_size_mode,
        ensemble_size=ensemble_size,
        ensemble_size_min=ensemble_size_min,
        ensemble_size_max=ensemble_size_max,
        ci_margin_factor=ci_margin_factor,
        ci_sample_sizes=ci_sample_sizes,
        nsga_crossover_probability=nsga_crossover_probability,
    )

    logger.info(
        f"Selected ensemble of {len(ensemble_result.selected_ensemble)} parameter "
        "sets using NSGA-II"
    )
    logger.info(
        f"Pareto front contains {len(ensemble_result.pareto_front)} solutions"
    )

    return ensemble_result

options: show_root_heading: true show_source: false heading_level: 3 members: - select_ensemble

StatisticsCalculator

Computes ensemble statistics and predictions with confidence intervals.

StatisticsCalculator

Handles calculation of ensemble statistics.

This class is responsible for: - Calculating parameter statistics across the ensemble - Generating ensemble predictions - Computing prediction intervals (median, CI bounds) - Calculating coverage metrics

Parameters:

Name Type Description Default
simulation Simulation

A fully initialized Simulation object

required
problem CalibrationProblem

The calibration problem definition

required
confidence_level float

Confidence level for CI calculation (e.g., 0.95)

0.95
Source code in commol/api/probabilistic/statistics_calculator.py
class StatisticsCalculator:
    """Handles calculation of ensemble statistics.

    This class is responsible for:
    - Calculating parameter statistics across the ensemble
    - Generating ensemble predictions
    - Computing prediction intervals (median, CI bounds)
    - Calculating coverage metrics

    Parameters
    ----------
    simulation : Simulation
        A fully initialized Simulation object
    problem : CalibrationProblem
        The calibration problem definition
    confidence_level : float
        Confidence level for CI calculation (e.g., 0.95)
    """

    def __init__(
        self,
        simulation: "Simulation",
        problem: "CalibrationProblem",
        confidence_level: float = 0.95,
    ):
        self.simulation = simulation
        self.problem = problem
        self.confidence_level = confidence_level

    def calculate_parameter_statistics(
        self,
        ensemble_params: list[CalibrationEvaluation],
    ) -> dict[str, ParameterSetStatistics]:
        """Calculate statistics for each parameter across the ensemble.

        Parameters
        ----------
        ensemble_params : list[CalibrationEvaluation]
            List of parameter sets in the ensemble

        Returns
        -------
        dict[str, ParameterSetStatistics]
            Dictionary mapping parameter names to their statistics
        """
        param_names = ensemble_params[0].parameter_names
        param_values = {
            name: [p.parameters[i] for p in ensemble_params]
            for i, name in enumerate(param_names)
        }

        # Calculate percentile bounds based on confidence level
        ci_lower_percentile = (1.0 - self.confidence_level) / 2.0 * 100
        ci_upper_percentile = (1.0 + self.confidence_level) / 2.0 * 100

        param_statistics = {}
        for name, values in param_values.items():
            param_statistics[name] = ParameterSetStatistics(
                mean=float(np.mean(values)),
                median=float(np.median(values)),
                std=float(np.std(values)),
                percentile_lower=float(np.percentile(values, ci_lower_percentile)),
                percentile_upper=float(np.percentile(values, ci_upper_percentile)),
                min=float(np.min(values)),
                max=float(np.max(values)),
            )
        return param_statistics

    def generate_ensemble_predictions(
        self,
        ensemble_params: list[CalibrationEvaluation],
        compartment_ids: list[str],
        time_steps: int,
    ) -> dict[str, list[list[float]]]:
        """Generate predictions for each ensemble member in parallel using Rust.

        Parameters
        ----------
        ensemble_params : list[CalibrationEvaluation]
            List of parameter sets in the ensemble
        compartment_ids : list[str]
            List of compartment IDs to generate predictions for
        time_steps : int
            Number of time steps to simulate

        Returns
        -------
        dict[str, list[list[float]]]
            Dictionary mapping compartment IDs to list of prediction trajectories
        """
        param_names = ensemble_params[0].parameter_names

        # Extract parameter sets
        parameter_sets = [ep.parameters for ep in ensemble_params]

        # Call Rust function to generate all predictions in parallel
        all_predictions_raw = commol_rs.calibration.generate_predictions_parallel(
            self.simulation.engine,
            parameter_sets,
            param_names,
            time_steps,
        )

        # Reorganize predictions by compartment
        # all_predictions_raw is list[list[list[float]]] where:
        # - outer list: one per parameter set
        # - middle list: one per time step
        # - inner list: one per compartment
        all_predictions: dict[str, list[list[float]]] = {
            comp_id: [] for comp_id in compartment_ids
        }

        compartment_idx_map = {
            bin.id: idx
            for idx, bin in enumerate(self.simulation.model_definition.population.bins)
        }

        for predictions_per_param_set in all_predictions_raw:
            # predictions_per_param_set is list[list[float]]
            # where [time_step][compartment_idx]
            for comp_id in compartment_ids:
                comp_idx = compartment_idx_map[comp_id]
                trajectory = [
                    predictions_per_param_set[t][comp_idx]
                    for t in range(len(predictions_per_param_set))
                ]
                all_predictions[comp_id].append(trajectory)

        return all_predictions

    def calculate_prediction_intervals(
        self,
        all_predictions: dict[str, list[list[float]]],
        compartment_ids: list[str],
    ) -> tuple[dict[str, list[float]], dict[str, list[float]], dict[str, list[float]]]:
        """Calculate median and confidence intervals from ensemble predictions.

        Parameters
        ----------
        all_predictions : dict[str, list[list[float]]]
            Dictionary mapping compartment IDs to list of prediction trajectories
        compartment_ids : list[str]
            List of compartment IDs

        Returns
        -------
        tuple[dict[str, list[float]], dict[str, list[float]], dict[str, list[float]]]
            Tuple of (median, lower CI, upper CI) dictionaries
        """
        prediction_median: dict[str, list[float]] = {}
        prediction_ci_lower: dict[str, list[float]] = {}
        prediction_ci_upper: dict[str, list[float]] = {}

        ci_lower_percentile = (1.0 - self.confidence_level) / 2.0 * 100
        ci_upper_percentile = (1.0 + self.confidence_level) / 2.0 * 100

        for comp_id in compartment_ids:
            predictions_array = np.array(all_predictions[comp_id])
            prediction_median[comp_id] = np.median(predictions_array, axis=0).tolist()
            prediction_ci_lower[comp_id] = np.percentile(
                predictions_array, ci_lower_percentile, axis=0
            ).tolist()
            prediction_ci_upper[comp_id] = np.percentile(
                predictions_array, ci_upper_percentile, axis=0
            ).tolist()

        return prediction_median, prediction_ci_lower, prediction_ci_upper

    def calculate_coverage_metrics(
        self,
        prediction_ci_lower: dict[str, list[float]],
        prediction_ci_upper: dict[str, list[float]],
    ) -> tuple[float, float]:
        """Calculate coverage percentage and average CI width.

        Parameters
        ----------
        prediction_ci_lower : dict[str, list[float]]
            Lower CI bounds for each compartment
        prediction_ci_upper : dict[str, list[float]]
            Upper CI bounds for each compartment

        Returns
        -------
        tuple[float, float]
            Tuple of (coverage_percentage, average_ci_width)
        """
        points_in_ci = 0
        total_points = len(self.problem.observed_data)
        total_ci_width = 0.0

        for obs in self.problem.observed_data:
            comp_id = obs.compartment
            step = obs.step
            observed_value = obs.value

            if comp_id in prediction_ci_lower and step < len(
                prediction_ci_lower[comp_id]
            ):
                ci_lower = prediction_ci_lower[comp_id][step]
                ci_upper = prediction_ci_upper[comp_id][step]

                if ci_lower <= observed_value <= ci_upper:
                    points_in_ci += 1

                total_ci_width += ci_upper - ci_lower

        coverage_percentage = (
            (points_in_ci / total_points * 100) if total_points > 0 else 0.0
        )
        average_ci_width = total_ci_width / total_points if total_points > 0 else 0.0

        return coverage_percentage, average_ci_width

Functions

calculate_parameter_statistics

calculate_parameter_statistics(ensemble_params: list[CalibrationEvaluation]) -> dict[str, ParameterSetStatistics]

Calculate statistics for each parameter across the ensemble.

Parameters:

Name Type Description Default
ensemble_params list[CalibrationEvaluation]

List of parameter sets in the ensemble

required

Returns:

Type Description
dict[str, ParameterSetStatistics]

Dictionary mapping parameter names to their statistics

Source code in commol/api/probabilistic/statistics_calculator.py
def calculate_parameter_statistics(
    self,
    ensemble_params: list[CalibrationEvaluation],
) -> dict[str, ParameterSetStatistics]:
    """Calculate statistics for each parameter across the ensemble.

    Parameters
    ----------
    ensemble_params : list[CalibrationEvaluation]
        List of parameter sets in the ensemble

    Returns
    -------
    dict[str, ParameterSetStatistics]
        Dictionary mapping parameter names to their statistics
    """
    param_names = ensemble_params[0].parameter_names
    param_values = {
        name: [p.parameters[i] for p in ensemble_params]
        for i, name in enumerate(param_names)
    }

    # Calculate percentile bounds based on confidence level
    ci_lower_percentile = (1.0 - self.confidence_level) / 2.0 * 100
    ci_upper_percentile = (1.0 + self.confidence_level) / 2.0 * 100

    param_statistics = {}
    for name, values in param_values.items():
        param_statistics[name] = ParameterSetStatistics(
            mean=float(np.mean(values)),
            median=float(np.median(values)),
            std=float(np.std(values)),
            percentile_lower=float(np.percentile(values, ci_lower_percentile)),
            percentile_upper=float(np.percentile(values, ci_upper_percentile)),
            min=float(np.min(values)),
            max=float(np.max(values)),
        )
    return param_statistics

generate_ensemble_predictions

generate_ensemble_predictions(ensemble_params: list[CalibrationEvaluation], compartment_ids: list[str], time_steps: int) -> dict[str, list[list[float]]]

Generate predictions for each ensemble member in parallel using Rust.

Parameters:

Name Type Description Default
ensemble_params list[CalibrationEvaluation]

List of parameter sets in the ensemble

required
compartment_ids list[str]

List of compartment IDs to generate predictions for

required
time_steps int

Number of time steps to simulate

required

Returns:

Type Description
dict[str, list[list[float]]]

Dictionary mapping compartment IDs to list of prediction trajectories

Source code in commol/api/probabilistic/statistics_calculator.py
def generate_ensemble_predictions(
    self,
    ensemble_params: list[CalibrationEvaluation],
    compartment_ids: list[str],
    time_steps: int,
) -> dict[str, list[list[float]]]:
    """Generate predictions for each ensemble member in parallel using Rust.

    Parameters
    ----------
    ensemble_params : list[CalibrationEvaluation]
        List of parameter sets in the ensemble
    compartment_ids : list[str]
        List of compartment IDs to generate predictions for
    time_steps : int
        Number of time steps to simulate

    Returns
    -------
    dict[str, list[list[float]]]
        Dictionary mapping compartment IDs to list of prediction trajectories
    """
    param_names = ensemble_params[0].parameter_names

    # Extract parameter sets
    parameter_sets = [ep.parameters for ep in ensemble_params]

    # Call Rust function to generate all predictions in parallel
    all_predictions_raw = commol_rs.calibration.generate_predictions_parallel(
        self.simulation.engine,
        parameter_sets,
        param_names,
        time_steps,
    )

    # Reorganize predictions by compartment
    # all_predictions_raw is list[list[list[float]]] where:
    # - outer list: one per parameter set
    # - middle list: one per time step
    # - inner list: one per compartment
    all_predictions: dict[str, list[list[float]]] = {
        comp_id: [] for comp_id in compartment_ids
    }

    compartment_idx_map = {
        bin.id: idx
        for idx, bin in enumerate(self.simulation.model_definition.population.bins)
    }

    for predictions_per_param_set in all_predictions_raw:
        # predictions_per_param_set is list[list[float]]
        # where [time_step][compartment_idx]
        for comp_id in compartment_ids:
            comp_idx = compartment_idx_map[comp_id]
            trajectory = [
                predictions_per_param_set[t][comp_idx]
                for t in range(len(predictions_per_param_set))
            ]
            all_predictions[comp_id].append(trajectory)

    return all_predictions

calculate_prediction_intervals

calculate_prediction_intervals(all_predictions: dict[str, list[list[float]]], compartment_ids: list[str]) -> tuple[dict[str, list[float]], dict[str, list[float]], dict[str, list[float]]]

Calculate median and confidence intervals from ensemble predictions.

Parameters:

Name Type Description Default
all_predictions dict[str, list[list[float]]]

Dictionary mapping compartment IDs to list of prediction trajectories

required
compartment_ids list[str]

List of compartment IDs

required

Returns:

Type Description
tuple[dict[str, list[float]], dict[str, list[float]], dict[str, list[float]]]

Tuple of (median, lower CI, upper CI) dictionaries

Source code in commol/api/probabilistic/statistics_calculator.py
def calculate_prediction_intervals(
    self,
    all_predictions: dict[str, list[list[float]]],
    compartment_ids: list[str],
) -> tuple[dict[str, list[float]], dict[str, list[float]], dict[str, list[float]]]:
    """Calculate median and confidence intervals from ensemble predictions.

    Parameters
    ----------
    all_predictions : dict[str, list[list[float]]]
        Dictionary mapping compartment IDs to list of prediction trajectories
    compartment_ids : list[str]
        List of compartment IDs

    Returns
    -------
    tuple[dict[str, list[float]], dict[str, list[float]], dict[str, list[float]]]
        Tuple of (median, lower CI, upper CI) dictionaries
    """
    prediction_median: dict[str, list[float]] = {}
    prediction_ci_lower: dict[str, list[float]] = {}
    prediction_ci_upper: dict[str, list[float]] = {}

    ci_lower_percentile = (1.0 - self.confidence_level) / 2.0 * 100
    ci_upper_percentile = (1.0 + self.confidence_level) / 2.0 * 100

    for comp_id in compartment_ids:
        predictions_array = np.array(all_predictions[comp_id])
        prediction_median[comp_id] = np.median(predictions_array, axis=0).tolist()
        prediction_ci_lower[comp_id] = np.percentile(
            predictions_array, ci_lower_percentile, axis=0
        ).tolist()
        prediction_ci_upper[comp_id] = np.percentile(
            predictions_array, ci_upper_percentile, axis=0
        ).tolist()

    return prediction_median, prediction_ci_lower, prediction_ci_upper

calculate_coverage_metrics

calculate_coverage_metrics(prediction_ci_lower: dict[str, list[float]], prediction_ci_upper: dict[str, list[float]]) -> tuple[float, float]

Calculate coverage percentage and average CI width.

Parameters:

Name Type Description Default
prediction_ci_lower dict[str, list[float]]

Lower CI bounds for each compartment

required
prediction_ci_upper dict[str, list[float]]

Upper CI bounds for each compartment

required

Returns:

Type Description
tuple[float, float]

Tuple of (coverage_percentage, average_ci_width)

Source code in commol/api/probabilistic/statistics_calculator.py
def calculate_coverage_metrics(
    self,
    prediction_ci_lower: dict[str, list[float]],
    prediction_ci_upper: dict[str, list[float]],
) -> tuple[float, float]:
    """Calculate coverage percentage and average CI width.

    Parameters
    ----------
    prediction_ci_lower : dict[str, list[float]]
        Lower CI bounds for each compartment
    prediction_ci_upper : dict[str, list[float]]
        Upper CI bounds for each compartment

    Returns
    -------
    tuple[float, float]
        Tuple of (coverage_percentage, average_ci_width)
    """
    points_in_ci = 0
    total_points = len(self.problem.observed_data)
    total_ci_width = 0.0

    for obs in self.problem.observed_data:
        comp_id = obs.compartment
        step = obs.step
        observed_value = obs.value

        if comp_id in prediction_ci_lower and step < len(
            prediction_ci_lower[comp_id]
        ):
            ci_lower = prediction_ci_lower[comp_id][step]
            ci_upper = prediction_ci_upper[comp_id][step]

            if ci_lower <= observed_value <= ci_upper:
                points_in_ci += 1

            total_ci_width += ci_upper - ci_lower

    coverage_percentage = (
        (points_in_ci / total_points * 100) if total_points > 0 else 0.0
    )
    average_ci_width = total_ci_width / total_points if total_points > 0 else 0.0

    return coverage_percentage, average_ci_width

options: show_root_heading: true show_source: false heading_level: 3 members: - calculate_parameter_statistics - generate_ensemble_predictions - calculate_prediction_intervals - calculate_coverage_metrics