Source code for aquamvs.pipeline.interfaces

"""Protocol interfaces for pipeline abstraction."""

import logging
from collections.abc import Iterator
from typing import Protocol, runtime_checkable

import numpy as np
import torch

from ..calibration import CameraData

logger = logging.getLogger(__name__)


[docs] @runtime_checkable class FrameSource(Protocol): """Protocol for frame iteration over multi-camera video or image sequences. Abstracts both VideoSet and ImageDirectorySet to allow uniform iteration over synchronized frames from multiple cameras. """
[docs] def iterate_frames( self, start: int, stop: int | None, step: int ) -> Iterator[tuple[int, dict[str, np.ndarray]]]: """Iterate over synchronized frames from all cameras. Args: start: Starting frame index. stop: Ending frame index (exclusive), or None for all remaining frames. step: Frame step size (1 = every frame, 2 = every other frame, etc.). Yields: Tuple of (frame_idx, images) where images is a dict mapping camera name to raw BGR image (H, W, 3) uint8 array. May contain None values for cameras that failed to read. """ ...
def __enter__(self): """Enter context manager.""" ... def __exit__(self, exc_type, exc_val, exc_tb): """Exit context manager.""" ...
[docs] @runtime_checkable class CalibrationProvider(Protocol): """Protocol for providing calibration data to the pipeline. Defines the interface for accessing camera calibration parameters and refractive geometry. The existing CalibrationData class already satisfies this protocol structurally — no modifications needed. """ @property def cameras(self) -> dict[str, CameraData]: """Per-camera calibration data, keyed by camera name.""" ... @property def water_z(self) -> float: """Z-coordinate of the water surface in world frame (meters).""" ... @property def n_water(self) -> float: """Refractive index of water.""" ... @property def n_air(self) -> float: """Refractive index of air.""" ... @property def interface_normal(self) -> torch.Tensor: """Interface normal vector, shape (3,), float32.""" ... @property def ring_cameras(self) -> list[str]: """Names of non-auxiliary cameras (sorted for determinism).""" ... @property def auxiliary_cameras(self) -> list[str]: """Names of auxiliary cameras (sorted for determinism).""" ...
[docs] def camera_positions(self) -> dict[str, torch.Tensor]: """World-frame camera centers, computed as C = -R^T @ t. Returns: Dictionary mapping camera names to their centers in world frame. Each center is a tensor of shape (3,), float32. """ ...
def ensure_refractive_params(provider: CalibrationProvider) -> CalibrationProvider: """Ensure calibration has valid refractive parameters. Checks if water_z, n_water, and n_air are non-trivial (not zero or 1.0). If missing or set to trivial values, logs a descriptive warning and returns a provider that uses n_air = n_water = 1.0 (refraction-naive fallback). This allows the pipeline to operate on calibrations that lack refractive data (e.g., air-only setups or pinhole-only calibrations) without errors. Args: provider: Original calibration provider. Returns: CalibrationProvider with valid refractive parameters. Either the original provider (if params are valid) or a thin wrapper that overrides n_air and n_water to 1.0. """ # Check for missing or trivial refractive parameters has_water_z = provider.water_z is not None and provider.water_z != 0.0 has_n_water = provider.n_water is not None and provider.n_water != 1.0 has_n_air = provider.n_air is not None and provider.n_air != 1.0 if has_water_z and has_n_water and has_n_air: # All parameters present and non-trivial return provider # Missing or trivial parameters — warn and fallback logger.warning( "Calibration missing refractive parameters (water_z=%s, n_water=%s, n_air=%s). " "Falling back to refraction-naive mode (n_air=n_water=1.0). " "This is suitable for air-only setups or testing, but will produce incorrect " "results for underwater geometry.", provider.water_z, provider.n_water, provider.n_air, ) # Create a simple wrapper that overrides n_air and n_water class RefractionNaiveProvider: """Wrapper that forces n_air = n_water = 1.0 for refraction-naive mode.""" def __init__(self, original: CalibrationProvider): self._original = original @property def cameras(self) -> dict[str, CameraData]: return self._original.cameras @property def water_z(self) -> float: return self._original.water_z @property def n_water(self) -> float: return 1.0 @property def n_air(self) -> float: return 1.0 @property def interface_normal(self) -> torch.Tensor: return self._original.interface_normal @property def ring_cameras(self) -> list[str]: return self._original.ring_cameras @property def auxiliary_cameras(self) -> list[str]: return self._original.auxiliary_cameras def camera_positions(self) -> dict[str, torch.Tensor]: return self._original.camera_positions() return RefractionNaiveProvider(provider)