Source code for ot.dr

# -*- coding: utf-8 -*-
"""
Dimension reduction with OT


.. warning::
    Note that by default the module is not imported in :mod:`ot`. In order to
    use it you need to explicitly import :mod:`ot.dr`

"""

# Author: Remi Flamary <remi.flamary@unice.fr>
#         Minhui Huang <mhhuang@ucdavis.edu>
#         Jakub Zadrozny <jakub.r.zadrozny@gmail.com>
#         Antoine Collas <antoine.collas@inria.fr>
#
# License: MIT License

from scipy import linalg

try:
    import autograd.numpy as np
    from sklearn.decomposition import PCA

    import pymanopt
    import pymanopt.manifolds
    import pymanopt.optimizers
except ImportError:
    raise ImportError(
        "Missing dependency for ot.dr. Requires autograd, pymanopt, scikit-learn. You can install with install with 'pip install POT[dr]', or 'conda install autograd pymanopt scikit-learn'"
    )

from .bregman import sinkhorn as sinkhorn_bregman
from .utils import dist as dist_utils, check_random_state


[docs] def dist(x1, x2): r"""Compute squared euclidean distance between samples (autograd)""" x1p2 = np.sum(np.square(x1), 1) x2p2 = np.sum(np.square(x2), 1) return x1p2.reshape((-1, 1)) + x2p2.reshape((1, -1)) - 2 * np.dot(x1, x2.T)
[docs] def sinkhorn(w1, w2, M, reg, k): r"""Sinkhorn algorithm with fixed number of iteration (autograd)""" K = np.exp(-M / reg) ui = np.ones((M.shape[0],)) vi = np.ones((M.shape[1],)) for i in range(k): vi = w2 / (np.dot(K.T, ui) + 1e-50) ui = w1 / (np.dot(K, vi) + 1e-50) G = ui.reshape((M.shape[0], 1)) * K * vi.reshape((1, M.shape[1])) return G
[docs] def logsumexp(M, axis): r"""Log-sum-exp reduction compatible with autograd (no numpy implementation)""" amax = np.amax(M, axis=axis, keepdims=True) return np.log(np.sum(np.exp(M - amax), axis=axis)) + np.squeeze(amax, axis=axis)
[docs] def sinkhorn_log(w1, w2, M, reg, k): r"""Sinkhorn algorithm in log-domain with fixed number of iteration (autograd)""" Mr = -M / reg ui = np.zeros((M.shape[0],)) vi = np.zeros((M.shape[1],)) log_w1 = np.log(w1) log_w2 = np.log(w2) for i in range(k): vi = log_w2 - logsumexp(Mr + ui[:, None], 0) ui = log_w1 - logsumexp(Mr + vi[None, :], 1) G = np.exp(ui[:, None] + Mr + vi[None, :]) return G
[docs] def split_classes(X, y): r"""split samples in :math:`\mathbf{X}` by classes in :math:`\mathbf{y}`""" lstsclass = np.unique(y) return [X[y == i, :].astype(np.float32) for i in lstsclass]
[docs] def fda(X, y, p=2, reg=1e-16): r"""Fisher Discriminant Analysis Parameters ---------- X : ndarray, shape (n, d) Training samples. y : ndarray, shape (n,) Labels for training samples. p : int, optional Size of dimensionality reduction. reg : float, optional Regularization term >0 (ridge regularization) Returns ------- P : ndarray, shape (d, p) Optimal transportation matrix for the given parameters proj : callable projection function including mean centering """ mx = np.mean(X) X -= mx.reshape((1, -1)) # data split between classes d = X.shape[1] xc = split_classes(X, y) nc = len(xc) p = min(nc - 1, p) Cw = 0 for x in xc: Cw += np.cov(x, rowvar=False) Cw /= nc mxc = np.zeros((d, nc)) for i in range(nc): mxc[:, i] = np.mean(xc[i]) mx0 = np.mean(mxc, 1) Cb = 0 for i in range(nc): Cb += (mxc[:, i] - mx0).reshape((-1, 1)) * (mxc[:, i] - mx0).reshape((1, -1)) w, V = linalg.eig(Cb, Cw + reg * np.eye(d)) idx = np.argsort(w.real) Popt = V[:, idx[-p:]] def proj(X): return (X - mx.reshape((1, -1))).dot(Popt) return Popt, proj
[docs] def wda( X, y, p=2, reg=1, k=10, solver=None, sinkhorn_method="sinkhorn", maxiter=100, verbose=0, P0=None, normalize=False, ): r""" Wasserstein Discriminant Analysis :ref:`[11] <references-wda>` The function solves the following optimization problem: .. math:: \mathbf{P} = \mathop{\arg \min}_\mathbf{P} \quad \frac{\sum\limits_i W(P \mathbf{X}^i, P \mathbf{X}^i)}{\sum\limits_{i, j \neq i} W(P \mathbf{X}^i, P \mathbf{X}^j)} where : - :math:`P` is a linear projection operator in the Stiefel(`p`, `d`) manifold - :math:`W` is entropic regularized Wasserstein distances - :math:`\mathbf{X}^i` are samples in the dataset corresponding to class i **Choosing a Sinkhorn solver** By default and when using a regularization parameter that is not too small the default sinkhorn solver should be enough. If you need to use a small regularization to get sparse cost matrices, you should use the :py:func:`ot.dr.sinkhorn_log` solver that will avoid numerical errors, but can be slow in practice. Parameters ---------- X : ndarray, shape (n, d) Training samples. y : ndarray, shape (n,) Labels for training samples. p : int, optional Size of dimensionality reduction. reg : float, optional Regularization term >0 (entropic regularization) solver : None | str, optional None for steepest descent or 'TrustRegions' for trust regions algorithm else should be a pymanopt.solvers sinkhorn_method : str method used for the Sinkhorn solver, either 'sinkhorn' or 'sinkhorn_log' P0 : ndarray, shape (d, p) Initial starting point for projection. normalize : bool, optional Normalize the Wasserstaiun distance by the average distance on P0 (default : False) verbose : int, optional Print information along iterations. Returns ------- P : ndarray, shape (d, p) Optimal transportation matrix for the given parameters proj : callable Projection function including mean centering. .. _references-wda: References ---------- .. [11] Flamary, R., Cuturi, M., Courty, N., & Rakotomamonjy, A. (2016). Wasserstein Discriminant Analysis. arXiv preprint arXiv:1608.08063. """ # noqa if sinkhorn_method.lower() == "sinkhorn": sinkhorn_solver = sinkhorn elif sinkhorn_method.lower() == "sinkhorn_log": sinkhorn_solver = sinkhorn_log else: raise ValueError("Unknown Sinkhorn method '%s'." % sinkhorn_method) mx = np.mean(X) X -= mx.reshape((1, -1)) # data split between classes d = X.shape[1] xc = split_classes(X, y) # compute uniform weighs wc = [np.ones((x.shape[0]), dtype=np.float32) / x.shape[0] for x in xc] # pre-compute reg_c,c' if P0 is not None and normalize: regmean = np.zeros((len(xc), len(xc))) for i, xi in enumerate(xc): xi = np.dot(xi, P0) for j, xj in enumerate(xc[i:]): xj = np.dot(xj, P0) M = dist(xi, xj) regmean[i, j] = np.sum(M) / (len(xi) * len(xj)) else: regmean = np.ones((len(xc), len(xc))) manifold = pymanopt.manifolds.Stiefel(d, p) @pymanopt.function.autograd(manifold) def cost(P): # wda loss loss_b = 0 loss_w = 0 for i, xi in enumerate(xc): xi = np.dot(xi, P) for j, xj in enumerate(xc[i:]): xj = np.dot(xj, P) M = dist(xi, xj) G = sinkhorn_solver(wc[i], wc[j + i], M, reg * regmean[i, j], k) if j == 0: loss_w += np.sum(G * M) else: loss_b += np.sum(G * M) # loss inversed because minimization return loss_w / loss_b # declare manifold and problem problem = pymanopt.Problem(manifold=manifold, cost=cost) # declare solver and solve if solver is None: solver = pymanopt.optimizers.SteepestDescent( max_iterations=maxiter, log_verbosity=verbose ) elif solver in ["tr", "TrustRegions"]: solver = pymanopt.optimizers.TrustRegions( max_iterations=maxiter, log_verbosity=verbose ) Popt = solver.run(problem, initial_point=P0) def proj(X): return (X - mx.reshape((1, -1))).dot(Popt.point) return Popt.point, proj
[docs] def projection_robust_wasserstein( X, Y, a, b, tau, U0=None, reg=0.1, k=2, stopThr=1e-3, maxiter=100, verbose=0, random_state=None, ): r""" Projection Robust Wasserstein Distance :ref:`[32] <references-projection-robust-wasserstein>` The function solves the following optimization problem: .. math:: \max_{U \in St(d, k)} \ \min_{\pi \in \Pi(\mu,\nu)} \quad \sum_{i,j} \pi_{i,j} \|U^T(\mathbf{x}_i - \mathbf{y}_j)\|^2 - \mathrm{reg} \cdot H(\pi) - :math:`U` is a linear projection operator in the Stiefel(`d`, `k`) manifold - :math:`H(\pi)` is entropy regularizer - :math:`\mathbf{x}_i`, :math:`\mathbf{y}_j` are samples of measures :math:`\mu` and :math:`\nu` respectively Parameters ---------- X : ndarray, shape (n, d) Samples from measure :math:`\mu` Y : ndarray, shape (n, d) Samples from measure :math:`\nu` a : ndarray, shape (n, ) weights for measure :math:`\mu` b : ndarray, shape (n, ) weights for measure :math:`\nu` tau : float stepsize for Riemannian Gradient Descent U0 : ndarray, shape (d, p) Initial starting point for projection. reg : float, optional Regularization term >0 (entropic regularization) k : int Subspace dimension stopThr : float, optional Stop threshold on error (>0) verbose : int, optional Print information along iterations. random_state : int, RandomState instance or None, default=None Determines random number generation for initial value of projection operator when U0 is not given. Returns ------- pi : ndarray, shape (n, n) Optimal transportation matrix for the given parameters U : ndarray, shape (d, k) Projection operator. .. _references-projection-robust-wasserstein: References ---------- .. [32] Huang, M. , Ma S. & Lai L. (2021). A Riemannian Block Coordinate Descent Method for Computing the Projection Robust Wasserstein Distance, ICML. """ # noqa # initialization n, d = X.shape m, d = Y.shape a = np.asarray(a, dtype=np.float64) b = np.asarray(b, dtype=np.float64) u = np.ones(n) / n v = np.ones(m) / m ones = np.ones((n, m)) assert d > k if U0 is None: rng = check_random_state(random_state) U = rng.randn(d, k) U, _ = np.linalg.qr(U) else: U = U0 def Vpi(X, Y, a, b, pi): # Return the second order matrix of the displacements: sum_ij { (pi)_ij (X_i-Y_j)(X_i-Y_j)^T }. A = X.T.dot(pi).dot(Y) return ( X.T.dot(np.diag(a)).dot(X) + Y.T.dot(np.diag(np.sum(pi, 0))).dot(Y) - A - A.T ) err = 1 iter = 0 while err > stopThr and iter < maxiter: # Projected cost matrix UUT = U.dot(U.T) M = ( np.diag(np.diag(X.dot(UUT.dot(X.T)))).dot(ones) + ones.dot(np.diag(np.diag(Y.dot(UUT.dot(Y.T))))) - 2 * X.dot(UUT.dot(Y.T)) ) A = np.empty(M.shape, dtype=M.dtype) np.divide(M, -reg, out=A) np.exp(A, out=A) # Sinkhorn update Ap = (1 / a).reshape(-1, 1) * A AtransposeU = np.dot(A.T, u) v = np.divide(b, AtransposeU) u = 1.0 / np.dot(Ap, v) pi = u.reshape((-1, 1)) * A * v.reshape((1, -1)) V = Vpi(X, Y, a, b, pi) # Riemannian gradient descent G = 2 / reg * V.dot(U) GTU = G.T.dot(U) xi = G - U.dot(GTU + GTU.T) / 2 # Riemannian gradient U, _ = np.linalg.qr(U + tau * xi) # Retraction by QR decomposition grad_norm = np.linalg.norm(xi) err = max(reg * grad_norm, np.linalg.norm(np.sum(pi, 0) - b, 1)) f_val = np.trace(U.T.dot(V.dot(U))) if verbose: print("RBCD Iteration: ", iter, " error", err, "\t fval: ", f_val) iter = iter + 1 return pi, U
[docs] def ewca( X, U0=None, reg=1, k=2, method="BCD", sinkhorn_method="sinkhorn", stopThr=1e-6, maxiter=100, maxiter_sink=1000, maxiter_MM=10, verbose=0, ): r""" Entropic Wasserstein Component Analysis :ref:`[52] <references-entropic-wasserstein-component_analysis>`. The function solves the following optimization problem: .. math:: \mathbf{U} = \mathop{\arg \min}_\mathbf{U} \quad W(\mathbf{X}, \mathbf{U}\mathbf{U}^T \mathbf{X}) where : - :math:`\mathbf{U}` is a matrix in the Stiefel(`p`, `d`) manifold - :math:`W` is entropic regularized Wasserstein distances - :math:`\mathbf{X}` are samples Parameters ---------- X : ndarray, shape (n, d) Samples from measure :math:`\mu`. U0 : ndarray, shape (d, k), optional Initial starting point for projection. reg : float, optional Regularization term >0 (entropic regularization). k : int, optional Subspace dimension. method : str, optional Eather 'BCD' or 'MM' (Block Coordinate Descent or Majorization-Minimization). Prefer MM when d is large. sinkhorn_method : str Method used for the Sinkhorn solver, see :ref:`ot.bregman.sinkhorn` for more details. stopThr : float, optional Stop threshold on error (>0). maxiter : int, optional Maximum number of iterations of the BCD/MM. maxiter_sink : int, optional Maximum number of iterations of the Sinkhorn solver. maxiter_MM : int, optional Maximum number of iterations of the MM (only used when method='MM'). verbose : int, optional Print information along iterations. Returns ------- pi : ndarray, shape (n, n) Optimal transportation matrix for the given parameters. U : ndarray, shape (d, k) Matrix Stiefel manifold. .. _references-entropic-wasserstein-component_analysis: References ---------- .. [52] Collas, A., Vayer, T., Flamary, F., & Breloy, A. (2023). Entropic Wasserstein Component Analysis. """ # noqa n, d = X.shape X = X - X.mean(0) if U0 is None: pca_fitted = PCA(n_components=k).fit(X) U = pca_fitted.components_.T if method == "MM": lambda_scm = pca_fitted.explained_variance_[0] else: U = U0 # marginals u0 = (1.0 / n) * np.ones(n) # print iterations if verbose > 0: print( "{:4s}|{:13s}|{:12s}|{:12s}".format("It.", "Loss", "Crit.", "Thres.") + "\n" + "-" * 40 ) def compute_loss(M, pi, reg): return np.sum(M * pi) + reg * np.sum(pi * (np.log(pi) - 1)) def grassmann_distance(U1, U2): proj = U1.T @ U2 _, s, _ = np.linalg.svd(proj) s[s > 1] = 1 s = np.arccos(s) return np.linalg.norm(s) # loop it = 0 crit = np.inf sinkhorn_warmstart = None while (it < maxiter) and (crit > stopThr): U_old = U # Solve transport M = dist_utils(X, (X @ U) @ U.T) pi, log_sinkhorn = sinkhorn_bregman( u0, u0, M, reg, numItermax=maxiter_sink, method=sinkhorn_method, warmstart=sinkhorn_warmstart, warn=False, log=True, ) key_warmstart = "warmstart" if key_warmstart in log_sinkhorn: sinkhorn_warmstart = log_sinkhorn[key_warmstart] if (pi >= 1e-300).all(): loss = compute_loss(M, pi, reg) else: loss = np.inf # Solve PCA pi_sym = (pi + pi.T) / 2 if method == "BCD": # block coordinate descent S = X.T @ (2 * pi_sym - (1.0 / n) * np.eye(n)) @ X _, U = np.linalg.eigh(S) U = U[:, ::-1][:, :k] elif method == "MM": # majorization-minimization eig, _ = np.linalg.eigh(pi_sym) lambda_pi = eig[0] for _ in range(maxiter_MM): X_proj = X @ U X_T_X_proj = X.T @ X_proj R = (1 / n) * X_T_X_proj alpha = 1 - 2 * n * lambda_pi if alpha > 0: R = alpha * (R - lambda_scm * U) else: R = alpha * R R = R - (2 * X.T @ (pi_sym @ X_proj)) + (2 * lambda_pi * X_T_X_proj) U, _ = np.linalg.qr(R) else: raise ValueError(f"Unknown method '{method}', use 'BCD' or 'MM'.") # stop or not it += 1 crit = grassmann_distance(U_old, U) # print if verbose > 0: print("{:4d}|{:8e}|{:8e}|{:8e}".format(it, loss, crit, stopThr)) return pi, U