Source code for tab_right.drift.univariate

"""Univariate drift detection utilities for tab-right drift subpackage."""

from dataclasses import dataclass
from typing import Any, Dict, Tuple, Union

import numpy as np
import pandas as pd
import scipy.stats

from tab_right.drift.cramer_v import cramer_v


def normalize_wasserstein(
    reference: pd.Series, current: pd.Series, wasserstein_value: float, method: str = "range"
) -> float:
    """Normalize Wasserstein distance to make it more comparable across features with different scales.

    Parameters
    ----------
    reference : pd.Series
        Reference distribution.
    current : pd.Series
        Current distribution.
    wasserstein_value : float
        Raw Wasserstein distance value to normalize.
    method : str, default "range"
        Normalization method:
        - "range": Divide by the combined range of both distributions
        - "std": Divide by the pooled standard deviation
        - "iqr": Divide by the pooled interquartile range

    Returns
    -------
    float
        Normalized drift score between 0 and 1 in most practical cases.

    Raises
    ------
    ValueError
        If an unknown normalization method is provided.

    """
    if wasserstein_value == 0:
        return 0.0

    if pd.isna(wasserstein_value):
        return np.nan

    # Combine data for normalization calculations
    combined = pd.concat([reference, current])

    if method == "range":
        # Normalize by the combined range (max - min)
        normalization_factor = combined.max() - combined.min()
        if normalization_factor == 0:  # All values are the same
            return 0.0
    elif method == "std":
        # Normalize by the pooled standard deviation
        normalization_factor = combined.std()
        if normalization_factor == 0 or pd.isna(normalization_factor):  # No variance or insufficient data
            return 0.0
    elif method == "iqr":
        # Normalize by the interquartile range
        q75, q25 = np.nanpercentile(combined, [75, 25])
        normalization_factor = q75 - q25
        if normalization_factor == 0 or pd.isna(normalization_factor):  # No variance or insufficient data
            return 0.0
    else:
        raise ValueError(f"Unknown normalization method: {method}")

    return wasserstein_value / normalization_factor


@dataclass
class UnivariateDriftCalculator:
    """Calculate univariate drift between two DataFrames.

    This class implements the DriftCalc protocol and provides methods for
    detecting drift between two DataFrames using column-by-column analysis.

    Parameters
    ----------
    df1 : pd.DataFrame
        The reference DataFrame
    df2 : pd.DataFrame
        The current DataFrame to compare against the reference
    kind : Union[None, Dict[str, str]], default None
        How to treat columns:
        - None: Infer from data types
        - dict: Specify "continuous" or "categorical" for each column
    normalize : bool, default True
        Whether to normalize continuous drift scores
    normalization_method : str, default "range"
        Method to use for normalization, see normalize_wasserstein for options

    """

    df1: pd.DataFrame
    df2: pd.DataFrame
    kind: Union[None, Dict[str, str]] = None
    normalize: bool = True
    normalization_method: str = "range"

    def __post_init__(self) -> None:
        """Post-initialization: enforce kind protocol at instantiation.

        Raises
        ------
        ValueError
            If kind is not None and not a dict mapping column names to 'continuous' or 'categorical'.

        """
        if self.kind is not None and not isinstance(self.kind, dict):
            raise ValueError("kind must be None or a dict mapping column names to 'continuous' or 'categorical'.")

    def __call__(self) -> pd.DataFrame:
        """Calculate drift between two DataFrames.

        Returns
        -------
        pd.DataFrame
            DataFrame with drift results for each feature.

        Raises
        ------
        ValueError
            If kind is not None and not a dict mapping column names to 'continuous' or 'categorical'.

        """
        if self.kind is not None and not isinstance(self.kind, dict):
            raise ValueError("kind must be None or a dict mapping column names to 'continuous' or 'categorical'.")

        results = []
        common_cols = set(self.df1.columns) & set(self.df2.columns)

        if self.kind is None:
            kind_per_col = {}
            for col in common_cols:
                if pd.api.types.is_numeric_dtype(self.df1[col]):
                    nunique = min(self.df1[col].nunique(), self.df2[col].nunique())
                    if nunique <= 20:
                        kind_per_col[col] = "categorical"
                    else:
                        kind_per_col[col] = "continuous"
                else:
                    kind_per_col[col] = "categorical"
        elif isinstance(self.kind, dict):
            kind_per_col = {}
            for col in common_cols:
                t = self.kind.get(col, None)
                if t is not None:
                    kind_per_col[col] = t
                else:
                    if pd.api.types.is_numeric_dtype(self.df1[col]):
                        nunique = min(self.df1[col].nunique(), self.df2[col].nunique())
                        if nunique <= 20:
                            kind_per_col[col] = "categorical"
                        else:
                            kind_per_col[col] = "continuous"
                    else:
                        kind_per_col[col] = "categorical"
        else:
            raise ValueError("kind must be None or a dict mapping column names to 'continuous' or 'categorical'.")

        # Calculate drift for each column
        for col in common_cols:
            result_dict = detect_univariate_drift_with_options(
                self.df1[col],
                self.df2[col],
                kind=kind_per_col[col],
                normalize=self.normalize,
                normalization_method=self.normalization_method,
            )
            result_dict["feature"] = col
            results.append(result_dict)

        return pd.DataFrame(results)


def detect_univariate_drift_with_options(
    reference: pd.Series,
    current: pd.Series,
    kind: str = "auto",
    normalize: bool = True,
    normalization_method: str = "range",
) -> Dict[str, Any]:
    """Detect drift between two 1D distributions with normalization options.

    Parameters
    ----------
    reference : pd.Series
        Reference distribution.
    current : pd.Series
        Current distribution.
    kind : str, default "auto"
        "auto", "categorical", or "continuous". If "auto", infers from dtype.
    normalize : bool, default True
        Whether to normalize continuous drift scores
    normalization_method : str, default "range"
        Method to use for normalization, see normalize_wasserstein for options

    Returns
    -------
    Dict[str, Any]
        Dictionary with keys:
        - "type": Metric name (wasserstein or cramer_v)
        - "score": Drift score (normalized for continuous if normalize=True)
        - "raw_score": Unnormalized drift score (only for continuous)

    Raises
    ------
    ValueError
        If kind is not recognized.

    """
    if kind == "auto":
        if pd.api.types.is_numeric_dtype(reference):
            kind = "continuous"
        else:
            kind = "categorical"

    if kind == "continuous":
        # Calculate raw Wasserstein distance
        raw_score = scipy.stats.wasserstein_distance(reference.to_numpy(), current.to_numpy())

        result = {"type": "wasserstein", "raw_score": raw_score}

        # Apply normalization if requested
        if normalize:
            result["score"] = normalize_wasserstein(reference, current, raw_score, method=normalization_method)
        else:
            # LINE 144: This is where we assign the raw score when normalization is turned off
            result["score"] = raw_score  # Ensure this line (144) is covered

        return result

    elif kind == "categorical":
        # Cramer's V is already normalized between 0 and 1
        cv_score = cramer_v(reference, current)
        return {"type": "cramer_v", "score": cv_score}
    else:
        # LINE 136: This is where we raise ValueError for unknown kind
        raise ValueError("Unknown kind")  # Ensure this line (136) is covered


[docs] def detect_univariate_drift( reference: pd.Series, current: pd.Series, kind: str = "auto", normalize: bool = True, normalization_method: str = "range", ) -> Tuple[str, float]: """Detect drift between two 1D distributions. Parameters ---------- reference : pd.Series Reference distribution. current : pd.Series Current distribution. kind : str, default "auto" "auto", "categorical", or "continuous". If "auto", infers from dtype. normalize : bool, default True Whether to normalize continuous drift scores normalization_method : str, default "range" Method to use for normalization, see normalize_wasserstein for options Returns ------- tuple (metric name, value) Notes ----- This function calls detect_univariate_drift_with_options internally and may raise ValueError if kind is not recognized or if an invalid normalization method is specified. """ result = detect_univariate_drift_with_options( reference, current, kind=kind, normalize=normalize, normalization_method=normalization_method ) return result["type"], result["score"]
[docs] def detect_univariate_drift_df( reference: pd.DataFrame, current: pd.DataFrame, kind: str = "auto", normalize: bool = True, normalization_method: str = "range", ) -> pd.DataFrame: """Detect drift for each column in two DataFrames. Parameters ---------- reference : pd.DataFrame Reference DataFrame. current : pd.DataFrame. Current DataFrame. kind : str, default "auto" "auto", "categorical", or "continuous". If "auto", infers from dtype. normalize : bool, default True Whether to normalize continuous drift scores normalization_method : str, default "range" Method to use for normalization, see normalize_wasserstein for options Returns ------- pd.DataFrame DataFrame with columns: feature, metric, value, raw_value (for continuous features). Notes ----- This function is provided for backward compatibility. For new code, use the UnivariateDriftCalculator class instead. """ # Use the protocol-compliant class for implementation drift_calc = UnivariateDriftCalculator( df1=reference, df2=current, kind=None if kind == "auto" else {col: kind for col in reference.index}, normalize=normalize, normalization_method=normalization_method, ) result = drift_calc() # Rename columns to match old API for backward compatibility result = result.rename(columns={"type": "metric", "score": "value"}) if "raw_score" in result.columns: result = result.rename(columns={"raw_score": "raw_value"}) return result