Source code for ot.bregman._empirical

# -*- coding: utf-8 -*-
"""
Bregman projections solvers for entropic regularized OT for empirical distributions
"""

# Author: Remi Flamary <remi.flamary@unice.fr>
#         Kilian Fatras <kilian.fatras@irisa.fr>
#         Quang Huy Tran <quang-huy.tran@univ-ubs.fr>
#
# License: MIT License

import warnings

from ..utils import dist, list_to_array, unif, LazyTensor
from ..backend import get_backend

from ._sinkhorn import sinkhorn, sinkhorn2


def get_sinkhorn_lazytensor(X_a, X_b, f, g, metric="sqeuclidean", reg=1e-1, nx=None):
    r"""Get a LazyTensor of Sinkhorn solution from the dual potentials

    The returned LazyTensor is
    :math:`\mathbf{T} = exp(  \mathbf{f} \mathbf{1}_b^\top + \mathbf{1}_a \mathbf{g}^\top - \mathbf{C}/reg)`, where :math:`\mathbf{C}` is the pairwise metric matrix between samples :math:`\mathbf{X}_a` and :math:`\mathbf{X}_b`.

    Parameters
    ----------
    X_a : array-like, shape (n_samples_a, dim)
        samples in the source domain
    X_b : array-like, shape (n_samples_b, dim)
        samples in the target domain
    f : array-like, shape (n_samples_a,)
        First dual potentials (log space)
    g : array-like, shape (n_samples_b,)
        Second dual potentials (log space)
    metric : str, default='sqeuclidean'
        Metric used for the cost matrix computation
    reg : float, default=1e-1
        Regularization term >0
    nx : Backend(), default=None
        Numerical backend used


    Returns
    -------
    T : LazyTensor
        Sinkhorn solution tensor
    """

    if nx is None:
        nx = get_backend(X_a, X_b, f, g)

    shape = (X_a.shape[0], X_b.shape[0])

    def func(i, j, X_a, X_b, f, g, metric, reg):
        C = dist(X_a[i], X_b[j], metric=metric)
        return nx.exp(f[i, None] + g[None, j] - C / reg)

    T = LazyTensor(shape, func, X_a=X_a, X_b=X_b, f=f, g=g, metric=metric, reg=reg)

    return T


[docs] def empirical_sinkhorn( X_s, X_t, reg, a=None, b=None, metric="sqeuclidean", numIterMax=10000, stopThr=1e-9, isLazy=False, batchSize=100, verbose=False, log=False, warn=True, warmstart=None, **kwargs, ): r""" Solve the entropic regularization optimal transport problem and return the OT matrix from empirical data The function solves the following optimization problem: .. math:: \gamma = \mathop{\arg \min}_\gamma \quad \langle \gamma, \mathbf{M} \rangle_F + \mathrm{reg} \cdot\Omega(\gamma) s.t. \ \gamma \mathbf{1} &= \mathbf{a} \gamma^T \mathbf{1} &= \mathbf{b} \gamma &\geq 0 where : - :math:`\mathbf{M}` is the (`n_samples_a`, `n_samples_b`) metric cost matrix - :math:`\Omega` is the entropic regularization term :math:`\Omega(\gamma)=\sum_{i,j} \gamma_{i,j}\log(\gamma_{i,j})` - :math:`\mathbf{a}` and :math:`\mathbf{b}` are source and target weights (sum to 1) Parameters ---------- X_s : array-like, shape (n_samples_a, dim) samples in the source domain X_t : array-like, shape (n_samples_b, dim) samples in the target domain reg : float Regularization term >0 a : array-like, shape (n_samples_a,) samples weights in the source domain b : array-like, shape (n_samples_b,) samples weights in the target domain numItermax : int, optional Max number of iterations stopThr : float, optional Stop threshold on error (>0) isLazy: boolean, optional If True, then only calculate the cost matrix by block and return the dual potentials only (to save memory). If False, calculate full cost matrix and return outputs of sinkhorn function. batchSize: int or tuple of 2 int, optional Size of the batches used to compute the sinkhorn update without memory overhead. When a tuple is provided it sets the size of the left/right batches. verbose : bool, optional Print information along iterations log : bool, optional record log if True warn : bool, optional if True, raises a warning if the algorithm doesn't convergence. warmstart: tuple of arrays, shape (dim_a, dim_b), optional Initialization of dual potentials. If provided, the dual potentials should be given (that is the logarithm of the u,v sinkhorn scaling vectors) Returns ------- gamma : array-like, shape (n_samples_a, n_samples_b) Regularized optimal transportation matrix for the given parameters log : dict log dictionary return only if log==True in parameters Examples -------- >>> import numpy as np >>> n_samples_a = 2 >>> n_samples_b = 2 >>> reg = 0.1 >>> X_s = np.reshape(np.arange(n_samples_a, dtype=np.float64), (n_samples_a, 1)) >>> X_t = np.reshape(np.arange(0, n_samples_b, dtype=np.float64), (n_samples_b, 1)) >>> empirical_sinkhorn(X_s, X_t, reg=reg, verbose=False) # doctest: +NORMALIZE_WHITESPACE array([[4.99977301e-01, 2.26989344e-05], [2.26989344e-05, 4.99977301e-01]]) References ---------- .. [2] M. Cuturi, Sinkhorn Distances : Lightspeed Computation of Optimal Transport, Advances in Neural Information Processing Systems (NIPS) 26, 2013 .. [9] Schmitzer, B. (2016). Stabilized Sparse Scaling Algorithms for Entropy Regularized Transport Problems. arXiv preprint arXiv:1610.06519. .. [10] Chizat, L., Peyré, G., Schmitzer, B., & Vialard, F. X. (2016). Scaling algorithms for unbalanced transport problems. arXiv preprint arXiv:1607.05816. """ X_s, X_t = list_to_array(X_s, X_t) nx = get_backend(X_s, X_t) ns, nt = X_s.shape[0], X_t.shape[0] if a is None: a = nx.from_numpy(unif(ns), type_as=X_s) if b is None: b = nx.from_numpy(unif(nt), type_as=X_s) if isLazy: if log: dict_log = {"err": []} log_a, log_b = nx.log(a), nx.log(b) if warmstart is None: f, g = nx.zeros((ns,), type_as=a), nx.zeros((nt,), type_as=a) else: f, g = warmstart if isinstance(batchSize, int): bs, bt = batchSize, batchSize elif isinstance(batchSize, tuple) and len(batchSize) == 2: bs, bt = batchSize[0], batchSize[1] else: raise ValueError("Batch size must be in integer or a tuple of two integers") range_s, range_t = range(0, ns, bs), range(0, nt, bt) lse_f = nx.zeros((ns,), type_as=a) lse_g = nx.zeros((nt,), type_as=a) X_s_np = nx.to_numpy(X_s) X_t_np = nx.to_numpy(X_t) for i_ot in range(numIterMax): lse_f_cols = [] for i in range_s: M = dist(X_s_np[i : i + bs, :], X_t_np, metric=metric) M = nx.from_numpy(M, type_as=a) lse_f_cols.append(nx.logsumexp(g[None, :] - M / reg, axis=1)) lse_f = nx.concatenate(lse_f_cols, axis=0) f = log_a - lse_f lse_g_cols = [] for j in range_t: M = dist(X_s_np, X_t_np[j : j + bt, :], metric=metric) M = nx.from_numpy(M, type_as=a) lse_g_cols.append(nx.logsumexp(f[:, None] - M / reg, axis=0)) lse_g = nx.concatenate(lse_g_cols, axis=0) g = log_b - lse_g if (i_ot + 1) % 10 == 0: m1_cols = [] for i in range_s: M = dist(X_s_np[i : i + bs, :], X_t_np, metric=metric) M = nx.from_numpy(M, type_as=a) m1_cols.append( nx.sum( nx.exp(f[i : i + bs, None] + g[None, :] - M / reg), axis=1 ) ) m1 = nx.concatenate(m1_cols, axis=0) err = nx.sum(nx.abs(m1 - a)) if log: dict_log["err"].append(err) if verbose and (i_ot + 1) % 100 == 0: print( "Error in marginal at iteration {} = {}".format(i_ot + 1, err) ) if err <= stopThr: break else: if warn: warnings.warn( "Sinkhorn did not converge. You might want to " "increase the number of iterations `numItermax` " "or the regularization parameter `reg`." ) if log: dict_log["u"] = f dict_log["v"] = g dict_log["niter"] = i_ot dict_log["lazy_plan"] = get_sinkhorn_lazytensor(X_s, X_t, f, g, metric, reg) return (f, g, dict_log) else: return (f, g) else: M = dist(X_s, X_t, metric=metric) if log: pi, log = sinkhorn( a, b, M, reg, numItermax=numIterMax, stopThr=stopThr, verbose=verbose, log=True, warmstart=warmstart, **kwargs, ) return pi, log else: pi = sinkhorn( a, b, M, reg, numItermax=numIterMax, stopThr=stopThr, verbose=verbose, log=False, warmstart=warmstart, **kwargs, ) return pi
[docs] def empirical_sinkhorn2( X_s, X_t, reg, a=None, b=None, metric="sqeuclidean", numIterMax=10000, stopThr=1e-9, isLazy=False, batchSize=100, verbose=False, log=False, warn=True, warmstart=None, **kwargs, ): r""" Solve the entropic regularization optimal transport problem from empirical data and return the OT loss The function solves the following optimization problem: .. math:: W = \min_\gamma \quad \langle \gamma, \mathbf{M} \rangle_F + \mathrm{reg} \cdot\Omega(\gamma) s.t. \ \gamma \mathbf{1} &= \mathbf{a} \gamma^T \mathbf{1} &= \mathbf{b} \gamma &\geq 0 where : - :math:`\mathbf{M}` is the (`n_samples_a`, `n_samples_b`) metric cost matrix - :math:`\Omega` is the entropic regularization term :math:`\Omega(\gamma)=\sum_{i,j} \gamma_{i,j}\log(\gamma_{i,j})` - :math:`\mathbf{a}` and :math:`\mathbf{b}` are source and target weights (sum to 1) and returns :math:`\langle \gamma^*, \mathbf{M} \rangle_F` (without the entropic contribution). Parameters ---------- X_s : array-like, shape (n_samples_a, dim) samples in the source domain X_t : array-like, shape (n_samples_b, dim) samples in the target domain reg : float Regularization term >0 a : array-like, shape (n_samples_a,) samples weights in the source domain b : array-like, shape (n_samples_b,) samples weights in the target domain numItermax : int, optional Max number of iterations stopThr : float, optional Stop threshold on error (>0) isLazy: boolean, optional If True, then only calculate the cost matrix by block and return the dual potentials only (to save memory). If False, calculate full cost matrix and return outputs of sinkhorn function. batchSize: int or tuple of 2 int, optional Size of the batches used to compute the sinkhorn update without memory overhead. When a tuple is provided it sets the size of the left/right batches. verbose : bool, optional Print information along iterations log : bool, optional record log if True warn : bool, optional if True, raises a warning if the algorithm doesn't convergence. warmstart: tuple of arrays, shape (dim_a, dim_b), optional Initialization of dual potentials. If provided, the dual potentials should be given (that is the logarithm of the u,v sinkhorn scaling vectors) Returns ------- W : (n_hists) array-like or float Optimal transportation loss for the given parameters log : dict log dictionary return only if log==True in parameters Examples -------- >>> import numpy as np >>> n_samples_a = 2 >>> n_samples_b = 2 >>> reg = 0.1 >>> X_s = np.reshape(np.arange(n_samples_a, dtype=np.float64), (n_samples_a, 1)) >>> X_t = np.reshape(np.arange(0, n_samples_b, dtype=np.float64), (n_samples_b, 1)) >>> b = np.full((n_samples_b, 3), 1/n_samples_b) >>> empirical_sinkhorn2(X_s, X_t, b=b, reg=reg, verbose=False) array([4.53978687e-05, 4.53978687e-05, 4.53978687e-05]) References ---------- .. [2] M. Cuturi, Sinkhorn Distances : Lightspeed Computation of Optimal Transport, Advances in Neural Information Processing Systems (NIPS) 26, 2013 .. [9] Schmitzer, B. (2016). Stabilized Sparse Scaling Algorithms for Entropy Regularized Transport Problems. arXiv preprint arXiv:1610.06519. .. [10] Chizat, L., Peyré, G., Schmitzer, B., & Vialard, F. X. (2016). Scaling algorithms for unbalanced transport problems. arXiv preprint arXiv:1607.05816. """ X_s, X_t = list_to_array(X_s, X_t) nx = get_backend(X_s, X_t) ns, nt = X_s.shape[0], X_t.shape[0] if a is None: a = nx.from_numpy(unif(ns), type_as=X_s) if b is None: b = nx.from_numpy(unif(nt), type_as=X_s) if isLazy: if log: f, g, dict_log = empirical_sinkhorn( X_s, X_t, reg, a, b, metric, numIterMax=numIterMax, stopThr=stopThr, isLazy=isLazy, batchSize=batchSize, verbose=verbose, log=log, warn=warn, warmstart=warmstart, ) else: f, g = empirical_sinkhorn( X_s, X_t, reg, a, b, metric, numIterMax=numIterMax, stopThr=stopThr, isLazy=isLazy, batchSize=batchSize, verbose=verbose, log=log, warn=warn, warmstart=warmstart, ) bs = batchSize if isinstance(batchSize, int) else batchSize[0] range_s = range(0, ns, bs) loss = 0 X_s_np = nx.to_numpy(X_s) X_t_np = nx.to_numpy(X_t) for i in range_s: M_block = dist(X_s_np[i : i + bs, :], X_t_np, metric=metric) M_block = nx.from_numpy(M_block, type_as=a) pi_block = nx.exp(f[i : i + bs, None] + g[None, :] - M_block / reg) loss += nx.sum(M_block * pi_block) if log: return loss, dict_log else: return loss else: M = dist(X_s, X_t, metric=metric) if log: sinkhorn_loss, log = sinkhorn2( a, b, M, reg, numItermax=numIterMax, stopThr=stopThr, verbose=verbose, log=log, warn=warn, warmstart=warmstart, **kwargs, ) return sinkhorn_loss, log else: sinkhorn_loss = sinkhorn2( a, b, M, reg, numItermax=numIterMax, stopThr=stopThr, verbose=verbose, log=log, warn=warn, warmstart=warmstart, **kwargs, ) return sinkhorn_loss
[docs] def empirical_sinkhorn_divergence( X_s, X_t, reg, a=None, b=None, metric="sqeuclidean", numIterMax=10000, stopThr=1e-9, verbose=False, log=False, warn=True, warmstart=None, **kwargs, ): r""" Compute the sinkhorn divergence loss from empirical data The function solves the following optimization problems and return the sinkhorn divergence :math:`S`: .. math:: W &= \min_\gamma \quad \langle \gamma, \mathbf{M} \rangle_F + \mathrm{reg} \cdot\Omega(\gamma) W_a &= \min_{\gamma_a} \quad \langle \gamma_a, \mathbf{M_a} \rangle_F + \mathrm{reg} \cdot\Omega(\gamma_a) W_b &= \min_{\gamma_b} \quad \langle \gamma_b, \mathbf{M_b} \rangle_F + \mathrm{reg} \cdot\Omega(\gamma_b) S &= W - \frac{W_a + W_b}{2} .. math:: s.t. \ \gamma \mathbf{1} &= \mathbf{a} \gamma^T \mathbf{1} &= \mathbf{b} \gamma &\geq 0 \gamma_a \mathbf{1} &= \mathbf{a} \gamma_a^T \mathbf{1} &= \mathbf{a} \gamma_a &\geq 0 \gamma_b \mathbf{1} &= \mathbf{b} \gamma_b^T \mathbf{1} &= \mathbf{b} \gamma_b &\geq 0 where : - :math:`\mathbf{M}` (resp. :math:`\mathbf{M_a}`, :math:`\mathbf{M_b}`) is the (`n_samples_a`, `n_samples_b`) metric cost matrix (resp (`n_samples_a, n_samples_a`) and (`n_samples_b`, `n_samples_b`)) - :math:`\Omega` is the entropic regularization term :math:`\Omega(\gamma)=\sum_{i,j} \gamma_{i,j}\log(\gamma_{i,j})` - :math:`\mathbf{a}` and :math:`\mathbf{b}` are source and target weights (sum to 1) and returns :math:`\langle \gamma^*, \mathbf{M} \rangle_F -(\langle \gamma^*_a, \mathbf{M_a} \rangle_F + \langle \gamma^*_b , \mathbf{M_b} \rangle_F)/2`. .. note: The current implementation does not account for the entropic contributions and thus differs from the Sinkhorn divergence as introduced in the literature. The possibility to account for the entropic contributions will be provided in a future release. Parameters ---------- X_s : array-like, shape (n_samples_a, dim) samples in the source domain X_t : array-like, shape (n_samples_b, dim) samples in the target domain reg : float Regularization term >0 a : array-like, shape (n_samples_a,) samples weights in the source domain b : array-like, shape (n_samples_b,) samples weights in the target domain numItermax : int, optional Max number of iterations stopThr : float, optional Stop threshold on error (>0) verbose : bool, optional Print information along iterations log : bool, optional record log if True warn : bool, optional if True, raises a warning if the algorithm doesn't convergence. warmstart: tuple of arrays, shape (dim_a, dim_b), optional Initialization of dual potentials. If provided, the dual potentials should be given (that is the logarithm of the u,v sinkhorn scaling vectors) Returns ------- W : (1,) array-like Optimal transportation symmetrized loss for the given parameters log : dict log dictionary return only if log==True in parameters Examples -------- >>> import numpy as np >>> n_samples_a = 2 >>> n_samples_b = 4 >>> reg = 0.1 >>> X_s = np.reshape(np.arange(n_samples_a, dtype=np.float64), (n_samples_a, 1)) >>> X_t = np.reshape(np.arange(0, n_samples_b, dtype=np.float64), (n_samples_b, 1)) >>> empirical_sinkhorn_divergence(X_s, X_t, reg) # doctest: +ELLIPSIS 1.499887176049052 References ---------- .. [23] Aude Genevay, Gabriel Peyré, Marco Cuturi, Learning Generative Models with Sinkhorn Divergences, Proceedings of the Twenty-First International Conference on Artificial Intelligence and Statistics, (AISTATS) 21, 2018 """ X_s, X_t = list_to_array(X_s, X_t) nx = get_backend(X_s, X_t) if warmstart is None: warmstart_a, warmstart_b = None, None else: u, v = warmstart warmstart_a = (u, u) warmstart_b = (v, v) if log: sinkhorn_loss_ab, log_ab = empirical_sinkhorn2( X_s, X_t, reg, a, b, metric=metric, numIterMax=numIterMax, stopThr=stopThr, verbose=verbose, log=log, warn=warn, warmstart=warmstart, **kwargs, ) sinkhorn_loss_a, log_a = empirical_sinkhorn2( X_s, X_s, reg, a, a, metric=metric, numIterMax=numIterMax, stopThr=stopThr, verbose=verbose, log=log, warn=warn, warmstart=warmstart_a, **kwargs, ) sinkhorn_loss_b, log_b = empirical_sinkhorn2( X_t, X_t, reg, b, b, metric=metric, numIterMax=numIterMax, stopThr=stopThr, verbose=verbose, log=log, warn=warn, warmstart=warmstart_b, **kwargs, ) sinkhorn_div = sinkhorn_loss_ab - 0.5 * (sinkhorn_loss_a + sinkhorn_loss_b) log = {} log["sinkhorn_loss_ab"] = sinkhorn_loss_ab log["sinkhorn_loss_a"] = sinkhorn_loss_a log["sinkhorn_loss_b"] = sinkhorn_loss_b log["log_sinkhorn_ab"] = log_ab log["log_sinkhorn_a"] = log_a log["log_sinkhorn_b"] = log_b return nx.maximum(0, sinkhorn_div), log else: sinkhorn_loss_ab = empirical_sinkhorn2( X_s, X_t, reg, a, b, metric=metric, numIterMax=numIterMax, stopThr=stopThr, verbose=verbose, log=log, warn=warn, warmstart=warmstart, **kwargs, ) sinkhorn_loss_a = empirical_sinkhorn2( X_s, X_s, reg, a, a, metric=metric, numIterMax=numIterMax, stopThr=stopThr, verbose=verbose, log=log, warn=warn, warmstart=warmstart_a, **kwargs, ) sinkhorn_loss_b = empirical_sinkhorn2( X_t, X_t, reg, b, b, metric=metric, numIterMax=numIterMax, stopThr=stopThr, verbose=verbose, log=log, warn=warn, warmstart=warmstart_b, **kwargs, ) sinkhorn_div = sinkhorn_loss_ab - 0.5 * (sinkhorn_loss_a + sinkhorn_loss_b) return nx.maximum(0, sinkhorn_div)