Source code for ot.sgot

# -*- coding: utf-8 -*-
"""
Spectral-Grassmann optimal transport for linear operators.

This module implements the Spectral-Grassmann Wasserstein framework for
comparing dynamical systems via their learned operator representations.

It provides tools to extract spectral "atoms" (eigenvalues and associated
eigenspaces) from linear operators and to compute an optimal transport metric
that combines a spectral term on eigenvalues with a Grassmannian term on
eigenspaces.
"""

# Author: Sienna O'Shea  <osheasienna@gmail.com>
#         Thibaut Germain<thibaut.germain.pro@gmail.com>
# License: MIT License

import ot
from ot.backend import get_backend

#####################################################################################################################################
#####################################################################################################################################
### NORMALIZATION AND OPERATOR ATOMS  ###
#####################################################################################################################################
#####################################################################################################################################


[docs] def eigenvalue_cost_matrix(Ds, Dt, q=1, eigen_scaling=None, nx=None): """Compute pairwise eigenvalue distances for source and target domains. Parameters ---------- Ds: array-like, shape (n_s,) Source eigenvalues. Dt: array-like, shape (n_t,) Target eigenvalues. q: int or float, optional Exponent applied to the eigenvalue distance, default 1. eigen_scaling: None or array-like of length 2, optional Scaling (real_scale, imag_scale) applied to eigenvalues before computing distances. If None, defaults to (1.0, 1.0). Accepts tuple/list or array/tensor with two entries. nx: module, optional Backend (NumPy-compatible). If None, inferred from inputs. Returns ---------- C: np.ndarray, shape (n_s, n_t) Eigenvalue cost matrix. """ if nx is None: nx = get_backend(Ds, Dt) if eigen_scaling is None: real_scale, imag_scale = 1.0, 1.0 else: if isinstance(eigen_scaling, (tuple, list)): real_scale, imag_scale = eigen_scaling else: real_scale, imag_scale = eigen_scaling[0], eigen_scaling[1] Dsn = nx.real(Ds) * real_scale + 1j * nx.imag(Ds) * imag_scale Dtn = nx.real(Dt) * real_scale + 1j * nx.imag(Dt) * imag_scale C_real = nx.real(Dsn)[:, None] - nx.real(Dtn)[None, :] C_real = C_real**2 C_imag = nx.imag(Dsn)[:, None] - nx.imag(Dtn)[None, :] C_imag = C_imag**2 prod = C_real + C_imag return prod ** (q / 2)
def _normalize_columns(A, nx, eps=1e-12): """Normalize the columns of an array with a backend-aware norm. Parameters ---------- A: array-like, shape (d, n) Input array whose columns are normalized. nx: module Backend (NumPy-compatible) used for math operations. eps: float, optional Minimum norm value to avoid division by zero, default 1e-12. Returns ---------- A_norm: array-like, shape (d, n) Column-normalized array. """ nrm = nx.norm(A, axis=0, keepdims=True) nrm = nx.real(nrm) # norm is real; avoid complex dtype for maximum (e.g. torch) nrm = nx.maximum(nrm, eps) return A / nrm def _delta_matrix_1d(Rs, Ls, Rt, Lt, nx=None, eps=1e-12): """Compute the normalized inner-product delta matrix for eigenspaces. Parameters ---------- Rs: array-like, shape (L, n_s) Source right eigenvectors. Ls: array-like, shape (L, n_s) Source left eigenvectors. Rt: array-like, shape (L, n_t) Target right eigenvectors. Lt: array-like, shape (L, n_t) Target left eigenvectors. nx: module, optional Backend (NumPy-compatible). If None, inferred from inputs. eps: float, optional Minimum norm value used in normalization, default 1e-12. Returns ---------- delta: array-like, shape (n_s, n_t) Delta matrix with entries in [0, 1]. """ if nx is None: nx = get_backend(Rs, Ls, Rt, Lt) Rsn = _normalize_columns(Rs, nx=nx, eps=eps) Lsn = _normalize_columns(Ls, nx=nx, eps=eps) Rtn = _normalize_columns(Rt, nx=nx, eps=eps) Ltn = _normalize_columns(Lt, nx=nx, eps=eps) Cr = nx.dot(nx.conj(Rsn).T, Rtn) Cl = nx.dot(nx.conj(Lsn).T, Ltn) delta = nx.abs(Cr * Cl) delta = nx.clip(delta, 0.0, 1.0) return delta def _grassmann_distance_squared( delta, grassmann_metric="chordal", q=1, nx=None, eps=1e-12 ): """Compute Grassmannian distances from delta similarities. Parameters ---------- delta: array-like Similarity values in [0, 1]. grassmann_metric: str, optional Metric type: "geodesic", "chordal", "procrustes", or "martin". q: int or float, optional Exponent applied to the Grassmann distance, in the same spirit as the eigenvalue cost exponent. Default is 1. nx: module, optional Backend (NumPy-compatible). If None, inferred from inputs. eps: float, optional Minimum value used for numerical stability in the Martin metric. Returns ------- dist_q: array-like Grassmannian distances raised to the power q. """ if nx is None: nx = get_backend(delta) if nx.any(delta < 0) or nx.any(delta > 1.0): raise ValueError( "delta must be in [0, 1]; found values outside this range " f"(min={nx.min(delta)}, max={nx.max(delta)})" ) if grassmann_metric == "geodesic": dist2 = nx.arccos(delta) ** 2 elif grassmann_metric == "chordal": dist2 = 1.0 - delta**2 elif grassmann_metric == "procrustes": dist2 = 2.0 * (1.0 - delta) elif grassmann_metric == "martin": delta2 = nx.maximum(delta**2, eps) dist2 = -nx.log(delta2) else: raise ValueError(f"Unknown grassmann_metric: {grassmann_metric}") return nx.real(dist2) ** (q / 2.0) ##################################################################################################################################### ##################################################################################################################################### ### SPECTRAL-GRASSMANNIAN WASSERSTEIN METRIC ### ##################################################################################################################################### #####################################################################################################################################
[docs] def sgot_cost_matrix( Ds, Rs, Ls, Dt, Rt, Lt, eta=0.5, p=2, q=1, grassmann_metric="chordal", eigen_scaling=None, nx=None, eps=1e-12, ): r"""Compute the SGOT cost matrix between two spectral decompositions. This returns the discrete ground cost matrix used in the SGOT optimal transport objective. Each spectral atom is :math:`z_i=(\lambda_i, V_i)` where :math:`\lambda_i \in \mathbb{C}` is an eigenvalue and :math:`V_i` is the associated (bi-orthogonal) eigenspace point. .. math:: C_2(i,j) \;=\; \eta\,C_\lambda(i,j) \;+\; (1-\eta)\,C_G(i,j), with spectral term .. math:: C_\lambda(i,j) \;=\; \big|\lambda_i - \lambda'_j\big|^{q}, and Grassmann term computed from a similarity score :math:`\delta_{ij}\in[0,1]` built from left/right eigenvectors .. math:: \delta_{ij} \;=\; \left|\langle r_i, r'_j\rangle\,\langle \ell_i, \ell'_j\rangle\right|. Depending on ``grassmann_metric``, the Grassmann contribution is: - ``"chordal"``: :math:`C_G(i,j) = 1 - \delta_{ij}^2` - ``"geodesic"``: :math:`C_G(i,j) = \arccos(\delta_{ij})^2` - ``"procrustes"``: :math:`C_G(i,j) = 2(1-\delta_{ij})` - ``"martin"``: :math:`C_G(i,j) = -\log\!\left(\max(\delta_{ij}^2,\varepsilon)\right)` Finally, we return a matrix suited for a :math:`p`-Wasserstein objective by treating :math:`C_2 \approx d^2` and outputting .. math:: C(i,j) \;=\; \big(\operatorname{Re}(C_2(i,j))\big)^{p/2}. Parameters ---------- Ds: array-like, shape (n_s,) Eigenvalues of operator T1. Rs: array-like, shape (L, n_s) Right eigenvectors of operator T1. Ls: array-like, shape (L, n_s) Left eigenvectors of operator T1. Dt: array-like, shape (n_t,) Eigenvalues of operator T2. Rt: array-like, shape (L, n_t) Right eigenvectors of operator T2. Lt: array-like, shape (L, n_t) Left eigenvectors of operator T2. eta: float, optional Weighting between spectral and Grassmann terms, default 0.5. p: int, optional Exponent defining the OT ground cost. The returned cost is :math:`d^p` with :math:`d^2 \approx C_2`. Default is 2. q: int, optional Exponent applied to the eigenvalue distance in the spectral term. Default is 1. grassmann_metric: str, optional Metric type: "geodesic", "chordal", "procrustes", or "martin". eigen_scaling: None or array-like of length 2, optional Scaling ``(real_scale, imag_scale)`` applied to eigenvalues before computing :math:`C_\lambda`. If provided, eigenvalues are transformed as :math:`\lambda \mapsto \alpha\operatorname{Re}(\lambda) + i\,\beta\operatorname{Im}(\lambda)`. If None, defaults to ``(1.0, 1.0)``. Accepts tuple/list or array/tensor with two entries. nx: module, optional Backend (NumPy-compatible). If None, inferred from inputs. eps: float, optional Minimum value used for numerical stability in Grassmann distances and Martin metric. Default is 1e-12. Returns ---------- C: array-like, shape (n_s, n_t) SGOT cost matrix :math:`C = d^p`. References ---------- .. [83] Germain, T., Flamary, R., Kostic, V. R., & Lounici, K. (2025). `A Spectral-Grassmann Wasserstein Metric for Operator Representations of Dynamical Systems <https://arxiv.org/abs/2509.24920>`_. """ if nx is None: nx = get_backend(Ds, Rs, Ls, Dt, Rt, Lt) _validate_sgot_inputs(Ds, Rs, Ls, Dt, Rt, Lt) C_lambda = eigenvalue_cost_matrix(Ds, Dt, q=q, eigen_scaling=eigen_scaling, nx=nx) delta = _delta_matrix_1d(Rs, Ls, Rt, Lt, nx=nx) C_grass = _grassmann_distance_squared( delta, grassmann_metric=grassmann_metric, q=q, nx=nx, eps=eps, ) C2 = eta * C_lambda + (1.0 - eta) * C_grass C = C2 ** (p / 2.0) return C
def _validate_sgot_inputs(Ds, Rs, Ls, Dt, Rt, Lt): """Validate shapes of spectral atoms for SGOT cost/metric.""" Ds_shape = getattr(Ds, "shape", None) Dt_shape = getattr(Dt, "shape", None) Ds_ndim = getattr(Ds, "ndim", None) Dt_ndim = getattr(Dt, "ndim", None) if Ds_ndim != 1 or Dt_ndim != 1: raise ValueError( "SGOT expects Ds and Dt to be 1D (n,), " f"got Ds shape {Ds_shape} and Dt shape {Dt_shape}" ) if Rs.shape != Ls.shape or Rt.shape != Lt.shape: raise ValueError( "Right/left eigenvector shapes must match; got " f"(Rs,Ls)=({Rs.shape},{Ls.shape}), (Rt,Lt)=({Rt.shape},{Lt.shape})" ) if Rs.shape[1] != Ds.shape[0] or Rt.shape[1] != Dt.shape[0]: raise ValueError( "Eigenvector columns must match eigenvalues: " f"Rs {Rs.shape[1]} vs Ds {Ds.shape[0]}, " f"Rt {Rt.shape[1]} vs Dt {Dt.shape[0]}" )
[docs] def sgot_metric( Ds, Rs, Ls, Dt, Rt, Lt, eta=0.5, p=2, q=1, r=2, grassmann_metric="chordal", eigen_scaling=None, Ws=None, Wt=None, nx=None, eps=1e-12, ): r"""Compute the SGOT metric between two spectral decompositions. This function computes a discrete optimal transport problem between two measures over spectral atoms :math:`z_i=(\lambda_i, V_i)` and :math:`z'_j=(\lambda'_j, V'_j)`. Using the ground cost matrix :math:`C = d^p` returned by :func:`sgot_cost_matrix`, we solve: .. math:: P^\star \in \arg\min_{P\in\Pi(W_s, W_t)} \langle C, P\rangle, where :math:`C(i,j) = d(i,j)^p` and :math:`d(i,j)` is the SGOT ground distance combining spectral and Grassmann terms with exponent :math:`q`: .. math:: d(i,j)^2 \;=\; \eta\,\big|\lambda_i - \lambda'_j\big|^{q} \;+\; (1-\eta)\,d_G(i,j)^{q}, and :math:`d_G(i,j)` is the Grassmann distance associated with the chosen ``grassmann_metric``. From the optimal plan :math:`P^\star`, we first form the :math:`p`-Wasserstein objective: .. math:: \mathrm{obj} \;=\; \left(\sum_{i,j} C(i,j)\,P^\star_{ij}\right)^{1/p}, and then apply an outer root :math:`r`: .. math:: \mathrm{SGOT} \;=\; \mathrm{obj}^{1/r}. In summary: - :math:`q` controls how strongly spectral and Grassmann distances are curved (via :math:`|\lambda_i - \lambda'_j|^{q}` and :math:`d_G(i,j)^{q}`), - :math:`p` is the exponent used in the OT ground cost and the inner Wasserstein root, - :math:`r` is an additional outer root applied to the Wasserstein objective. Parameters ---------- Ds: array-like, shape (n_s,) Eigenvalues of operator T1. Rs: array-like, shape (L, n_s) Right eigenvectors of operator T1. Ls: array-like, shape (L, n_s) Left eigenvectors of operator T1. Dt: array-like, shape (n_t,) Eigenvalues of operator T2. Rt: array-like, shape (L, n_t) Right eigenvectors of operator T2. Lt: array-like, shape (L, n_t) Left eigenvectors of operator T2. eta: float, optional Weighting between spectral and Grassmann terms, default 0.5. p: int, optional Exponent defining the OT ground cost, default 2. q: int, optional Exponent applied to the eigenvalue and Grassmann distances, default 1. r: int or float, optional Outer root applied to the Wasserstein objective, default 2. grassmann_metric: str, optional Metric type: "geodesic", "chordal", "procrustes", or "martin". eigen_scaling: None or array-like of length 2, optional Scaling ``(real_scale, imag_scale)`` applied to eigenvalues before computing the spectral cost. If None, defaults to ``(1.0, 1.0)``. Ws: array-like, shape (n_s,), optional Source weights. If None, uniform weights are used. Wt: array-like, shape (n_t,), optional Target weights. If None, uniform weights are used. nx: module, optional Backend (NumPy-compatible). If None, inferred from inputs. eps: float, optional Numerical stability constant, default 1e-12. Returns ------- dist: float SGOT distance between the two spectral decompositions. References ---------- .. [83] Germain, T., Flamary, R., Kostic, V. R., & Lounici, K. (2025). `A Spectral-Grassmann Wasserstein Metric for Operator Representations of Dynamical Systems <https://arxiv.org/abs/2509.24920>`_. """ if nx is None: nx = get_backend(Ds, Rs, Ls, Dt, Rt, Lt) _validate_sgot_inputs(Ds, Rs, Ls, Dt, Rt, Lt) C = sgot_cost_matrix( Ds, Rs, Ls, Dt, Rt, Lt, eta=eta, p=p, q=q, grassmann_metric=grassmann_metric, eigen_scaling=eigen_scaling, nx=nx, eps=eps, ) if Ws is None: Ws = nx.ones((C.shape[0],), type_as=C) / float(C.shape[0]) if Wt is None: Wt = nx.ones((C.shape[1],), type_as=C) / float(C.shape[1]) Ws = Ws / nx.sum(Ws) Wt = Wt / nx.sum(Wt) obj = ot.emd2(Ws, Wt, nx.real(C)) obj = obj ** (1.0 / p) return obj ** (1.0 / float(r))