"""Implementation of the DriftCalcP protocol."""
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, Optional
import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency, wasserstein_distance
[docs]
@dataclass
class DriftCalculator:
"""Implementation of DriftCalcP using Cramér's V and Wasserstein distance."""
df1: pd.DataFrame
df2: pd.DataFrame
kind: Optional[Dict[str, str]] = None
_feature_types: Dict[str, str] = field(init=False)
def __post_init__(self) -> None:
"""Initialize the DriftCalculator with reference and current datasets.
Raises:
ValueError: If there are no common columns between the reference and current datasets.
"""
common_cols = self._get_common_columns()
if not common_cols:
raise ValueError("No common columns between the reference and current datasets.")
self._validate_kind()
self._feature_types = self._determine_feature_types()
def _validate_kind(self) -> None:
"""Validate that 'kind' is either None or a dictionary.
Raises:
TypeError: If `kind` is not None or a dict.
"""
if self.kind is not None and not isinstance(self.kind, dict):
raise TypeError("`kind` must be None or a dict mapping column names to 'continuous' or 'categorical'.")
def _get_common_columns(self) -> list:
"""Get common columns between df1 and df2.
Returns:
List of column names present in both dataframes.
"""
return list(set(self.df1.columns) & set(self.df2.columns))
def _infer_type_from_data(self, col: str) -> str:
"""Infer feature type from data characteristics.
Args:
col: Column name to infer type for.
Returns:
str: "categorical" or "continuous"
"""
if pd.api.types.is_numeric_dtype(self.df1[col]) and pd.api.types.is_numeric_dtype(self.df2[col]):
nunique = min(self.df1[col].nunique(), self.df2[col].nunique())
if nunique <= 20:
return "categorical"
else:
return "continuous"
else:
return "categorical"
def _get_feature_type(self, col: str) -> str:
"""Get feature type for a single column.
Args:
col: Column name to get type for.
Returns:
str: "categorical" or "continuous"
"""
if isinstance(self.kind, dict):
t = self.kind.get(col, None)
if t is not None:
return t
return self._infer_type_from_data(col)
def _determine_feature_types(self) -> Dict[str, str]:
"""Determine if features are categorical or continuous based on `kind`.
Returns:
Dictionary mapping column names to their types ("categorical" or "continuous").
"""
self._validate_kind()
common_cols = self._get_common_columns()
feature_types = {}
for col in common_cols:
feature_types[col] = self._get_feature_type(col)
return feature_types
def __call__(self, columns: Optional[Iterable[str]] = None, bins: int = 10, **kwargs: Any) -> pd.DataFrame:
"""Calculate drift metrics between the reference and current datasets (vectorized).
Returns
-------
pd.DataFrame
DataFrame with drift metrics for each feature, containing:
- feature: Name of the feature
- type: Type of metric used (cramer_v, wasserstein, or N/A)
- score: Normalized drift score (for Wasserstein, this is the raw score)
- raw_score: Unnormalized drift metric value
"""
cols = list(self._feature_types.keys()) if columns is None else [c for c in columns if c in self._feature_types]
def drift_row(col: str) -> dict:
s1, s2 = self.df1[col].dropna(), self.df2[col].dropna()
t = self._feature_types[col]
if s1.empty or s2.empty:
return dict(feature=col, type="N/A (Empty Data)", score=np.nan, raw_score=np.nan)
if t == "categorical":
v = self._categorical_drift_calc(s1, s2)
return dict(feature=col, type="cramer_v", score=v, raw_score=v)
if t == "continuous":
v = self._continuous_drift_calc(s1, s2, bins=bins)
return dict(feature=col, type="wasserstein", score=v, raw_score=v)
raise ValueError(f"Unknown column type '{t}' for column '{col}'")
return pd.DataFrame([drift_row(col) for col in cols])
[docs]
def get_prob_density(self, columns: Optional[Iterable[str]] = None, bins: int = 10) -> pd.DataFrame:
"""Get probability densities for reference and current datasets (vectorized).
Returns
-------
pd.DataFrame
DataFrame with density information for each feature and bin, containing:
- feature: Name of the feature
- bin: Bin label (category name or numerical range)
- ref_density: Density in the reference dataset
- cur_density: Density in the current dataset
"""
cols = list(self._feature_types.keys()) if columns is None else [c for c in columns if c in self._feature_types]
dens = []
for col in cols:
s1, s2 = self.df1[col].dropna(), self.df2[col].dropna()
t = self._feature_types[col]
if t == "categorical":
cats = sorted(set(s1.unique()) | set(s2.unique()))
ref = s1.value_counts(normalize=True).reindex(cats, fill_value=0)
cur = s2.value_counts(normalize=True).reindex(cats, fill_value=0)
d = pd.DataFrame({"bin": cats, "ref_density": ref.values, "cur_density": cur.values})
elif t == "continuous":
minv, maxv = min(s1.min(), s2.min()), max(s1.max(), s2.max())
edges = np.linspace(minv, maxv, bins + 1)
ref_hist, _ = np.histogram(s1, bins=edges, density=True)
cur_hist, _ = np.histogram(s2, bins=edges, density=True)
labels = [f"({edges[i]:.2f}-{edges[i + 1]:.2f}]" for i in range(bins)]
d = pd.DataFrame({
"bin": labels,
"ref_density": ref_hist * np.diff(edges),
"cur_density": cur_hist * np.diff(edges),
})
else:
continue
d["feature"] = col
dens.append(d[["feature", "bin", "ref_density", "cur_density"]])
return pd.concat(dens, ignore_index=True)
@classmethod
def _categorical_drift_calc(cls, s1: pd.Series, s2: pd.Series) -> float:
"""Simplified Cramér's V statistic calculation.
Args:
s1: Reference series.
s2: Current series.
Returns:
float: Cramér's V statistic between 0 (no drift) and 1 (max drift).
"""
# Quick check for identical or empty series
if s1.equals(s2) or (s1.empty and s2.empty):
return 0.0
# Create raw count distributions
s1_counts = s1.value_counts()
s2_counts = s2.value_counts()
all_cats = sorted(set(s1_counts.index) | set(s2_counts.index))
table = pd.DataFrame({
"s1": s1_counts.reindex(all_cats, fill_value=0),
"s2": s2_counts.reindex(all_cats, fill_value=0),
})
# Handle edge cases where chi-squared calculation would fail
if len(all_cats) < 2 or table.shape[1] < 2:
return 0.0 if table["s1"].equals(table["s2"]) else 1.0
try:
chi2, _, _, _ = chi2_contingency(table)
n = table.values.sum()
min_dim = min(table.shape[0] - 1, table.shape[1] - 1)
v = np.sqrt(chi2 / (n * min_dim))
return max(0.0, min(1.0, v))
except ValueError:
return 0.0 if table["s1"].equals(table["s2"]) else 1.0
@classmethod
def _continuous_drift_calc(cls, s1: pd.Series, s2: pd.Series, bins: int = 10) -> float:
"""Simplified Wasserstein distance calculation.
Args:
s1: Reference series.
s2: Current series.
bins: Number of bins (not used, for protocol compatibility).
Returns:
float: Wasserstein distance between the two distributions.
Returns 0.0 if both are empty, 1.0 if only one is empty.
"""
if s1.empty or s2.empty:
return 0.0 if s1.empty and s2.empty else 1.0
return wasserstein_distance(s1.values, s2.values)