# Copyright (c) 2018, Mathieu Blondel
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
# Author: Mathieu Blondel
# Remi Flamary <remi.flamary@unice.fr>
# Tianlin Liu <t.liu@unibas.ch>
"""
Smooth and Sparse (KL an L2 reg.) and sparsity-constrained OT solvers.
Implementation of :
Smooth and Sparse Optimal Transport.
Mathieu Blondel, Vivien Seguy, Antoine Rolet.
In Proc. of AISTATS 2018.
https://arxiv.org/abs/1710.06276
(Original code from https://github.com/mblondel/smooth-ot/)
Sparsity-Constrained Optimal Transport.
Liu, T., Puigcerver, J., & Blondel, M. (2023).
Sparsity-constrained optimal transport.
Proceedings of the Eleventh International Conference on
Learning Representations (ICLR).
https://arxiv.org/abs/2209.15466
[17] Blondel, M., Seguy, V., & Rolet, A. (2018). Smooth and Sparse Optimal
Transport. Proceedings of the Twenty-First International Conference on
Artificial Intelligence and Statistics (AISTATS).
[50] Liu, T., Puigcerver, J., & Blondel, M. (2023).
Sparsity-constrained optimal transport.
Proceedings of the Eleventh International Conference on
Learning Representations (ICLR).
"""
import numpy as np
from scipy.optimize import minimize
from .backend import get_backend
import ot
[docs]
def projection_simplex(V, z=1, axis=None):
r"""Projection of :math:`\mathbf{V}` onto the simplex, scaled by `z`
.. math::
P\left(\mathbf{V}, z\right) = \mathop{\arg \min}_{\substack{\mathbf{y} >= 0 \\ \sum_i \mathbf{y}_i = z}} \quad \|\mathbf{y} - \mathbf{V}\|^2
Parameters
----------
V: ndarray, rank 2
z: float or array
If array, len(z) must be compatible with :math:`\mathbf{V}`
axis: None or int
- axis=None: project :math:`\mathbf{V}` by :math:`P(\mathbf{V}.\mathrm{ravel}(), z)`
- axis=1: project each :math:`\mathbf{V}_i` by :math:`P(\mathbf{V}_i, z_i)`
- axis=0: project each :math:`\mathbf{V}_{:, j}` by :math:`P(\mathbf{V}_{:, j}, z_j)`
Returns
-------
projection: ndarray, shape :math:`\mathbf{V}`.shape
"""
if axis == 1:
n_features = V.shape[1]
U = np.sort(V, axis=1)[:, ::-1]
z = np.ones(len(V)) * z
cssv = np.cumsum(U, axis=1) - z[:, np.newaxis]
ind = np.arange(n_features) + 1
cond = U - cssv / ind > 0
rho = np.count_nonzero(cond, axis=1)
theta = cssv[np.arange(len(V)), rho - 1] / rho
return np.maximum(V - theta[:, np.newaxis], 0)
elif axis == 0:
return projection_simplex(V.T, z, axis=1).T
else:
V = V.ravel().reshape(1, -1)
return projection_simplex(V, z, axis=1).ravel()
[docs]
class Regularization(object):
r"""Base class for Regularization objects
Notes
-----
This class is not intended for direct use but as apparent for true
regularization implementation.
"""
def __init__(self, gamma=1.0):
"""
Parameters
----------
gamma: float
Regularization parameter.
We recover unregularized OT when gamma -> 0.
"""
self.gamma = gamma
[docs]
def delta_Omega(X):
r"""
Compute :math:`\delta_\Omega(\mathbf{X}_{:, j})` for each :math:`\mathbf{X}_{:, j}`.
.. math::
\delta_\Omega(\mathbf{x}) = \sup_{\mathbf{y} >= 0} \
\mathbf{y}^T \mathbf{x} - \Omega(\mathbf{y})
Parameters
----------
X: array, shape = (len(a), len(b))
Input array.
Returns
-------
v: array, (len(b), )
Values: :math:`\mathbf{v}_j = \delta_\Omega(\mathbf{X}_{:, j})`
G: array, (len(a), len(b))
Gradients: :math:`\mathbf{G}_{:, j} = \nabla \delta_\Omega(\mathbf{X}_{:, j})`
"""
raise NotImplementedError
[docs]
def max_Omega(X, b):
r"""
Compute :math:`\mathrm{max}_{\Omega, j}(\mathbf{X}_{:, j})` for each :math:`\mathbf{X}_{:, j}`.
.. math::
\mathrm{max}_{\Omega, j}(\mathbf{x}) =
\sup_{\substack{\mathbf{y} >= 0 \ \sum_i \mathbf{y}_i = 1}}
\mathbf{y}^T \mathbf{x} - \frac{1}{\mathbf{b}_j} \Omega(\mathbf{b}_j \mathbf{y})
Parameters
----------
X: array, shape = (len(a), len(b))
Input array.
b: array, shape = (len(b), )
Returns
-------
v: array, (len(b), )
Values: :math:`\mathbf{v}_j = \mathrm{max}_{\Omega, j}(\mathbf{X}_{:, j})`
G: array, (len(a), len(b))
Gradients: :math:`\mathbf{G}_{:, j} = \nabla \mathrm{max}_{\Omega, j}(\mathbf{X}_{:, j})`
"""
raise NotImplementedError
[docs]
def Omega(T):
"""
Compute regularization term.
Parameters
----------
T: array, shape = len(a) x len(b)
Input array.
Returns
-------
value: float
Regularization term.
"""
raise NotImplementedError
[docs]
class NegEntropy(Regularization):
"""NegEntropy regularization"""
[docs]
def delta_Omega(self, X):
G = np.exp(X / self.gamma - 1)
val = self.gamma * np.sum(G, axis=0)
return val, G
[docs]
def max_Omega(self, X, b):
max_X = np.max(X, axis=0) / self.gamma
exp_X = np.exp(X / self.gamma - max_X)
val = self.gamma * (np.log(np.sum(exp_X, axis=0)) + max_X)
val -= self.gamma * np.log(b)
G = exp_X / np.sum(exp_X, axis=0)
return val, G
[docs]
def Omega(self, T):
return self.gamma * np.sum(T * np.log(T))
[docs]
class SquaredL2(Regularization):
"""Squared L2 regularization"""
[docs]
def delta_Omega(self, X):
max_X = np.maximum(X, 0)
val = np.sum(max_X**2, axis=0) / (2 * self.gamma)
G = max_X / self.gamma
return val, G
[docs]
def max_Omega(self, X, b):
G = projection_simplex(X / (b * self.gamma), axis=0)
val = np.sum(X * G, axis=0)
val -= 0.5 * self.gamma * b * np.sum(G * G, axis=0)
return val, G
[docs]
def Omega(self, T):
return 0.5 * self.gamma * np.sum(T**2)
[docs]
class SparsityConstrained(Regularization):
"""Squared L2 regularization with sparsity constraints"""
def __init__(self, max_nz, gamma=1.0):
self.max_nz = max_nz
self.gamma = gamma
[docs]
def delta_Omega(self, X):
# For each column of X, find entries that are not among the top max_nz.
non_top_indices = np.argpartition(-X, self.max_nz, axis=0)[self.max_nz :]
# Set these entries to -inf.
if X.ndim == 1:
X[non_top_indices] = 0.0
else:
X[non_top_indices, np.arange(X.shape[1])] = 0.0
max_X = np.maximum(X, 0)
val = np.sum(max_X**2, axis=0) / (2 * self.gamma)
G = max_X / self.gamma
return val, G
[docs]
def max_Omega(self, X, b):
# Project the scaled X onto the simplex with sparsity constraint.
G = ot.utils.projection_sparse_simplex(
X / (b * self.gamma), self.max_nz, axis=0
)
val = np.sum(X * G, axis=0)
val -= 0.5 * self.gamma * b * np.sum(G * G, axis=0)
return val, G
[docs]
def Omega(self, T):
return 0.5 * self.gamma * np.sum(T**2)
[docs]
def dual_obj_grad(alpha, beta, a, b, C, regul):
r"""
Compute objective value and gradients of dual objective.
Parameters
----------
alpha: array, shape = len(a)
beta: array, shape = len(b)
Current iterate of dual potentials.
a: array, shape = len(a)
b: array, shape = len(b)
Input histograms (should be non-negative and sum to 1).
C: array, shape = (len(a), len(b))
Ground cost matrix.
regul: Regularization object
Should implement a `delta_Omega(X)` method.
Returns
-------
obj: float
Objective value (higher is better).
grad_alpha: array, shape = len(a)
Gradient w.r.t. `alpha`.
grad_beta: array, shape = len(b)
Gradient w.r.t. `beta`.
"""
obj = np.dot(alpha, a) + np.dot(beta, b)
grad_alpha = a.copy()
grad_beta = b.copy()
# X[:, j] = alpha + beta[j] - C[:, j]
X = alpha[:, np.newaxis] + beta - C
# val.shape = len(b)
# G.shape = len(a) x len(b)
val, G = regul.delta_Omega(X)
obj -= np.sum(val)
grad_alpha -= G.sum(axis=1)
grad_beta -= G.sum(axis=0)
return obj, grad_alpha, grad_beta
[docs]
def solve_dual(
a, b, C, regul, method="L-BFGS-B", tol=1e-3, max_iter=500, verbose=False
):
"""
Solve the "smoothed" dual objective.
Parameters
----------
a: array, shape = (len(a), )
b: array, shape = (len(b), )
Input histograms (should be non-negative and sum to 1).
C: array, shape = (len(a), len(b))
Ground cost matrix.
regul: Regularization object
Should implement a `delta_Omega(X)` method.
method: str
Solver to be used (passed to `scipy.optimize.minimize`).
tol: float
Tolerance parameter.
max_iter: int
Maximum number of iterations.
Returns
-------
alpha: array, shape = (len(a), )
beta: array, shape = (len(b), )
Dual potentials.
"""
def _func(params):
# Unpack alpha and beta.
alpha = params[: len(a)]
beta = params[len(a) :]
obj, grad_alpha, grad_beta = dual_obj_grad(alpha, beta, a, b, C, regul)
# Pack grad_alpha and grad_beta.
grad = np.concatenate((grad_alpha, grad_beta))
# We need to maximize the dual.
return -obj, -grad
# Unfortunately, `minimize` only supports functions whose argument is a
# vector. So, we need to concatenate alpha and beta.
alpha_init = np.zeros(len(a))
beta_init = np.zeros(len(b))
params_init = np.concatenate((alpha_init, beta_init))
res = minimize(
_func,
params_init,
method=method,
jac=True,
tol=tol,
options=dict(maxiter=max_iter, disp=verbose),
)
alpha = res.x[: len(a)]
beta = res.x[len(a) :]
return alpha, beta, res
[docs]
def semi_dual_obj_grad(alpha, a, b, C, regul):
"""
Compute objective value and gradient of semi-dual objective.
Parameters
----------
alpha: array, shape = len(a)
Current iterate of semi-dual potentials.
a: array, shape = len(a)
b: array, shape = len(b)
Input histograms (should be non-negative and sum to 1).
C: array, shape = (len(a), len(b))
Ground cost matrix.
regul: Regularization object
Should implement a `max_Omega(X)` method.
Returns
-------
obj: float
Objective value (higher is better).
grad: array, shape = len(a)
Gradient w.r.t. alpha.
"""
obj = np.dot(alpha, a)
grad = a.copy()
# X[:, j] = alpha - C[:, j]
X = alpha[:, np.newaxis] - C
# val.shape = len(b)
# G.shape = len(a) x len(b)
val, G = regul.max_Omega(X, b)
obj -= np.dot(b, val)
grad -= np.dot(G, b)
return obj, grad
[docs]
def solve_semi_dual(
a, b, C, regul, method="L-BFGS-B", tol=1e-3, max_iter=500, verbose=False
):
"""
Solve the "smoothed" semi-dual objective.
Parameters
----------
a: array, shape = (len(a), )
b: array, shape = (len(b), )
Input histograms (should be non-negative and sum to 1).
C: array, shape = (len(a), len(b))
Ground cost matrix.
regul: Regularization object
Should implement a `max_Omega(X)` method.
method: str
Solver to be used (passed to `scipy.optimize.minimize`).
tol: float
Tolerance parameter.
max_iter: int
Maximum number of iterations.
Returns
-------
alpha: array, shape = (len(a), )
Semi-dual potentials.
"""
def _func(alpha):
obj, grad = semi_dual_obj_grad(alpha, a, b, C, regul)
# We need to maximize the semi-dual.
return -obj, -grad
alpha_init = np.zeros(len(a))
res = minimize(
_func,
alpha_init,
method=method,
jac=True,
tol=tol,
options=dict(maxiter=max_iter, disp=verbose),
)
return res.x, res
[docs]
def get_plan_from_dual(alpha, beta, C, regul):
r"""
Retrieve optimal transportation plan from optimal dual potentials.
Parameters
----------
alpha: array, shape = len(a)
beta: array, shape = len(b)
Optimal dual potentials.
C: array, shape = (len(a), len(b))
Ground cost matrix.
regul: Regularization object
Should implement a `delta_Omega(X)` method.
Returns
-------
T: array, shape = (len(a), len(b))
Optimal transportation plan.
"""
X = alpha[:, np.newaxis] + beta - C
return regul.delta_Omega(X)[1]
[docs]
def get_plan_from_semi_dual(alpha, b, C, regul):
r"""
Retrieve optimal transportation plan from optimal semi-dual potentials.
Parameters
----------
alpha: array, shape = len(a)
Optimal semi-dual potentials.
b: array, shape = len(b)
Second input histogram (should be non-negative and sum to 1).
C: array, shape = (len(a), len(b))
Ground cost matrix.
regul: Regularization object
Should implement a `delta_Omega(X)` method.
Returns
-------
T: array, shape = (len(a), len(b))
Optimal transportation plan.
"""
X = alpha[:, np.newaxis] - C
return regul.max_Omega(X, b)[1] * b
[docs]
def smooth_ot_dual(
a,
b,
M,
reg,
reg_type="l2",
method="L-BFGS-B",
stopThr=1e-9,
numItermax=500,
verbose=False,
log=False,
max_nz=None,
):
r"""
Solve the regularized OT problem in the dual and return the OT matrix
The function solves the smooth relaxed dual formulation (7) in
:ref:`[17] <references-smooth-ot-dual>`:
.. math::
\max_{\alpha,\beta}\quad \mathbf{a}^T\alpha + \mathbf{b}^T\beta -
\sum_j \delta_\Omega \left(\alpha+\beta_j-\mathbf{m}_j \right)
where :
- :math:`\mathbf{m}_j` is the j-th column of the cost matrix
- :math:`\delta_\Omega` is the convex conjugate of the regularization term :math:`\Omega`
- :math:`\mathbf{a}` and :math:`\mathbf{b}` are source and target weights (sum to 1)
The OT matrix can is reconstructed from the gradient of :math:`\delta_\Omega`
(See :ref:`[17] <references-smooth-ot-dual>` Proposition 1).
The optimization algorithm is using gradient decent (L-BFGS by default).
Parameters
----------
a : np.ndarray (ns,)
samples weights in the source domain
b : np.ndarray (nt,) or np.ndarray (nt,nbb)
samples in the target domain, compute sinkhorn with multiple targets
and fixed :math:`\mathbf{M}` if :math:`\mathbf{b}` is a matrix
(return OT loss + dual variables in log)
M : np.ndarray (ns,nt)
loss matrix
reg : float
Regularization term >0
reg_type : str
Regularization type, can be the following (default ='l2'):
- 'kl' : Kullback Leibler (~ Neg-entropy used in sinkhorn
:ref:`[2] <references-smooth-ot-dual>`)
- 'l2' : Squared Euclidean regularization
- 'sparsity_constrained' : Sparsity-constrained regularization [50]
max_nz : int or None, optional. Used only in the case of reg_type = 'sparsity_constrained' to specify the maximum number of nonzeros per column of the optimal plan;
not used for other regularization types.
method : str
Solver to use for scipy.optimize.minimize
numItermax : int, optional
Max number of iterations
stopThr : float, optional
Stop threshold on error (>0)
verbose : bool, optional
Print information along iterations
log : bool, optional
record log if True
Returns
-------
gamma : (ns, nt) ndarray
Optimal transportation matrix for the given parameters
log : dict
log dictionary return only if log==True in parameters
.. _references-smooth-ot-dual:
References
----------
.. [2] M. Cuturi, Sinkhorn Distances : Lightspeed Computation of Optimal Transport, Advances in Neural Information Processing Systems (NIPS) 26, 2013
.. [17] Blondel, M., Seguy, V., & Rolet, A. (2018). Smooth and Sparse Optimal Transport. Proceedings of the Twenty-First International Conference on Artificial Intelligence and Statistics (AISTATS).
.. [50] Liu, T., Puigcerver, J., & Blondel, M. (2023). Sparsity-constrained optimal transport. Proceedings of the Eleventh International Conference on Learning Representations (ICLR).
See Also
--------
ot.lp.emd : Unregularized OT
ot.sinhorn : Entropic regularized OT
ot.optim.cg : General regularized OT
"""
nx = get_backend(a, b, M)
if reg_type.lower() in ["l2", "squaredl2"]:
regul = SquaredL2(gamma=reg)
elif reg_type.lower() in ["entropic", "negentropy", "kl"]:
regul = NegEntropy(gamma=reg)
elif reg_type.lower() in ["sparsity_constrained", "sparsity-constrained"]:
if not isinstance(max_nz, int):
raise ValueError(f"max_nz {max_nz} must be an integer")
regul = SparsityConstrained(gamma=reg, max_nz=max_nz)
else:
raise NotImplementedError("Unknown regularization")
a0, b0, M0 = a, b, M
# convert to humpy
a, b, M = nx.to_numpy(a, b, M)
# solve dual
alpha, beta, res = solve_dual(
a, b, M, regul, max_iter=numItermax, tol=stopThr, verbose=verbose
)
# reconstruct transport matrix
G = nx.from_numpy(get_plan_from_dual(alpha, beta, M, regul), type_as=M0)
if log:
log = {
"alpha": nx.from_numpy(alpha, type_as=a0),
"beta": nx.from_numpy(beta, type_as=b0),
"res": res,
}
return G, log
else:
return G
[docs]
def smooth_ot_semi_dual(
a,
b,
M,
reg,
reg_type="l2",
max_nz=None,
method="L-BFGS-B",
stopThr=1e-9,
numItermax=500,
verbose=False,
log=False,
):
r"""
Solve the regularized OT problem in the semi-dual and return the OT matrix
The function solves the smooth relaxed dual formulation (10) in
:ref:`[17] <references-smooth-ot-semi-dual>`:
.. math::
\max_{\alpha}\quad \mathbf{a}^T\alpha- \mathrm{OT}_\Omega^*(\alpha, \mathbf{b})
where :
.. math::
\mathrm{OT}_\Omega^*(\alpha,b)=\sum_j \mathbf{b}_j
- :math:`\mathbf{m}_j` is the j-th column of the cost matrix
- :math:`\mathrm{OT}_\Omega^*(\alpha,b)` is defined in Eq. (9) in
:ref:`[17] <references-smooth-ot-semi-dual>`
- :math:`\mathbf{a}` and :math:`\mathbf{b}` are source and target weights (sum to 1)
The OT matrix can is reconstructed using :ref:`[17] <references-smooth-ot-semi-dual>` Proposition 2.
The optimization algorithm is using gradient decent (L-BFGS by default).
Parameters
----------
a : np.ndarray (ns,)
samples weights in the source domain
b : np.ndarray (nt,) or np.ndarray (nt,nbb)
samples in the target domain, compute sinkhorn with multiple targets
and fixed:math:`\mathbf{M}` if :math:`\mathbf{b}` is a matrix
(return OT loss + dual variables in log)
M : np.ndarray (ns,nt)
loss matrix
reg : float
Regularization term >0
reg_type : str
Regularization type, can be the following (default ='l2'):
- 'kl' : Kullback Leibler (~ Neg-entropy used in sinkhorn
:ref:`[2] <references-smooth-ot-semi-dual>`)
- 'l2' : Squared Euclidean regularization
- 'sparsity_constrained' : Sparsity-constrained regularization [50]
max_nz : int or None, optional. Used only in the case of reg_type = 'sparsity_constrained' to specify the maximum number of nonzeros per column of the optimal plan;
not used for other regularization types.
method : str
Solver to use for scipy.optimize.minimize
numItermax : int, optional
Max number of iterations
stopThr : float, optional
Stop threshold on error (>0)
verbose : bool, optional
Print information along iterations
log : bool, optional
record log if True
Returns
-------
gamma : (ns, nt) ndarray
Optimal transportation matrix for the given parameters
log : dict
log dictionary return only if log==True in parameters
.. _references-smooth-ot-semi-dual:
References
----------
.. [2] M. Cuturi, Sinkhorn Distances : Lightspeed Computation of Optimal Transport, Advances in Neural Information Processing Systems (NIPS) 26, 2013
.. [17] Blondel, M., Seguy, V., & Rolet, A. (2018). Smooth and Sparse Optimal Transport. Proceedings of the Twenty-First International Conference on Artificial Intelligence and Statistics (AISTATS).
.. [50] Liu, T., Puigcerver, J., & Blondel, M. (2023). Sparsity-constrained optimal transport. Proceedings of the Eleventh International Conference on Learning Representations (ICLR).
See Also
--------
ot.lp.emd : Unregularized OT
ot.sinhorn : Entropic regularized OT
ot.optim.cg : General regularized OT
"""
if reg_type.lower() in ["l2", "squaredl2"]:
regul = SquaredL2(gamma=reg)
elif reg_type.lower() in ["entropic", "negentropy", "kl"]:
regul = NegEntropy(gamma=reg)
elif reg_type.lower() in ["sparsity_constrained", "sparsity-constrained"]:
if not isinstance(max_nz, int):
raise ValueError(f"max_nz {max_nz} must be an integer")
regul = SparsityConstrained(gamma=reg, max_nz=max_nz)
else:
raise NotImplementedError("Unknown regularization")
# solve dual
alpha, res = solve_semi_dual(
a, b, M, regul, max_iter=numItermax, tol=stopThr, verbose=verbose
)
# reconstruct transport matrix
G = get_plan_from_semi_dual(alpha, b, M, regul)
if log:
log = {"alpha": alpha, "res": res}
return G, log
else:
return G