# -*- coding: utf-8 -*-
"""
Gromov-Wasserstein and Fused-Gromov-Wasserstein stochastic estimators.
"""
# Author: Rémi Flamary <remi.flamary@unice.fr>
# Tanguy Kerdoncuff <tanguy.kerdoncuff@laposte.net>
#
# License: MIT License
import numpy as np
from ..bregman import sinkhorn
from ..utils import list_to_array, check_random_state
from ..lp import emd_1d, emd
from ..backend import get_backend
[docs]
def GW_distance_estimation(
C1,
C2,
p,
q,
loss_fun,
T,
nb_samples_p=None,
nb_samples_q=None,
std=True,
random_state=None,
):
r"""
Returns an approximation of the Gromov-Wasserstein loss between :math:`(\mathbf{C_1}, \mathbf{p})` and :math:`(\mathbf{C_2}, \mathbf{q})`
with a fixed transport plan :math:`\mathbf{T}`. To recover an approximation of the Gromov-Wasserstein distance as defined in [13] compute :math:`d_{GW} = \frac{1}{2} \sqrt{\mathbf{GW}}`.
The function gives an unbiased approximation of the following equation:
.. math::
\mathbf{GW} = \sum_{i,j,k,l} L(\mathbf{C_{1}}_{i,k}, \mathbf{C_{2}}_{j,l}) \mathbf{T}_{i,j} \mathbf{T}_{k,l}
Where :
- :math:`\mathbf{C_1}`: Metric cost matrix in the source space
- :math:`\mathbf{C_2}`: Metric cost matrix in the target space
- `L` : Loss function to account for the misfit between the similarity matrices
- :math:`\mathbf{T}`: Matrix with marginal :math:`\mathbf{p}` and :math:`\mathbf{q}`
Parameters
----------
C1 : array-like, shape (ns, ns)
Metric cost matrix in the source space
C2 : array-like, shape (nt, nt)
Metric cost matrix in the target space
p : array-like, shape (ns,)
Distribution in the source space
q : array-like, shape (nt,)
Distribution in the target space
loss_fun : function: :math:`\mathbb{R} \times \mathbb{R} \mapsto \mathbb{R}`
Loss function used for the distance, the transport plan does not depend on the loss function
T : csr or array-like, shape (ns, nt)
Transport plan matrix, either a sparse csr or a dense matrix
nb_samples_p : int, optional
`nb_samples_p` is the number of samples (without replacement) along the first dimension of :math:`\mathbf{T}`
nb_samples_q : int, optional
`nb_samples_q` is the number of samples along the second dimension of :math:`\mathbf{T}`, for each sample along the first
std : bool, optional
Standard deviation associated with the prediction of the gromov-wasserstein cost
random_state : int or RandomState instance, optional
Fix the seed for reproducibility
Returns
-------
: float
Gromov-wasserstein cost
References
----------
.. [14] Kerdoncuff, Tanguy, Emonet, Rémi, Sebban, Marc
"Sampled Gromov Wasserstein."
Machine Learning Journal (MLJ). 2021.
"""
C1, C2, p, q = list_to_array(C1, C2, p, q)
nx = get_backend(C1, C2, p, q)
generator = check_random_state(random_state)
len_p = p.shape[0]
len_q = q.shape[0]
# It is always better to sample from the biggest distribution first.
if len_p < len_q:
p, q = q, p
len_p, len_q = len_q, len_p
C1, C2 = C2, C1
T = T.T
if nb_samples_p is None:
if nx.issparse(T):
# If T is sparse, it probably mean that PoGroW was used, thus the number of sample is reduced
nb_samples_p = min(int(5 * (len_p * np.log(len_p)) ** 0.5), len_p)
else:
nb_samples_p = len_p
else:
# The number of sample along the first dimension is without replacement.
nb_samples_p = min(nb_samples_p, len_p)
if nb_samples_q is None:
nb_samples_q = 1
if std:
nb_samples_q = max(2, nb_samples_q)
index_k = np.zeros((nb_samples_p, nb_samples_q), dtype=int)
index_l = np.zeros((nb_samples_p, nb_samples_q), dtype=int)
index_i = generator.choice(
len_p, size=nb_samples_p, p=nx.to_numpy(p), replace=False
)
index_j = generator.choice(
len_p, size=nb_samples_p, p=nx.to_numpy(p), replace=False
)
for i in range(nb_samples_p):
if nx.issparse(T):
T_indexi = nx.reshape(nx.todense(T[index_i[i], :]), (-1,))
T_indexj = nx.reshape(nx.todense(T[index_j[i], :]), (-1,))
else:
T_indexi = T[index_i[i], :]
T_indexj = T[index_j[i], :]
# For each of the row sampled, the column is sampled.
index_k[i] = generator.choice(
len_q,
size=nb_samples_q,
p=nx.to_numpy(T_indexi / nx.sum(T_indexi)),
replace=True,
)
index_l[i] = generator.choice(
len_q,
size=nb_samples_q,
p=nx.to_numpy(T_indexj / nx.sum(T_indexj)),
replace=True,
)
list_value_sample = nx.stack(
[
loss_fun(
C1[np.ix_(index_i, index_j)], C2[np.ix_(index_k[:, n], index_l[:, n])]
)
for n in range(nb_samples_q)
],
axis=2,
)
if std:
std_value = nx.sum(nx.std(list_value_sample, axis=2) ** 2) ** 0.5
return nx.mean(list_value_sample), std_value / (nb_samples_p * nb_samples_p)
else:
return nx.mean(list_value_sample)
[docs]
def pointwise_gromov_wasserstein(
C1,
C2,
p,
q,
loss_fun,
alpha=1,
max_iter=100,
threshold_plan=0,
log=False,
verbose=False,
random_state=None,
):
r"""
Returns the gromov-wasserstein transport between :math:`(\mathbf{C_1}, \mathbf{p})` and :math:`(\mathbf{C_2}, \mathbf{q})` using a stochastic Frank-Wolfe.
This method has a :math:`\mathcal{O}(\mathrm{max\_iter} \times PN^2)` time complexity with `P` the number of Sinkhorn iterations.
The function solves the following optimization problem:
.. math::
\mathbf{GW} = \mathop{\arg \min}_\mathbf{T} \quad \sum_{i,j,k,l}
L(\mathbf{C_1}_{i,k}, \mathbf{C_2}_{j,l}) \mathbf{T}_{i,j} \mathbf{T}_{k,l}
s.t. \ \mathbf{T} \mathbf{1} &= \mathbf{p}
\mathbf{T}^T \mathbf{1} &= \mathbf{q}
\mathbf{T} &\geq 0
Where :
- :math:`\mathbf{C_1}`: Metric cost matrix in the source space
- :math:`\mathbf{C_2}`: Metric cost matrix in the target space
- :math:`\mathbf{p}`: distribution in the source space
- :math:`\mathbf{q}`: distribution in the target space
- `L`: loss function to account for the misfit between the similarity matrices
Parameters
----------
C1 : array-like, shape (ns, ns)
Metric cost matrix in the source space
C2 : array-like, shape (nt, nt)
Metric cost matrix in the target space
p : array-like, shape (ns,)
Distribution in the source space
q : array-like, shape (nt,)
Distribution in the target space
loss_fun : function: :math:`\mathbb{R} \times \mathbb{R} \mapsto \mathbb{R}`
Loss function used for the distance, the transport plan does not depend on the loss function
alpha : float
Step of the Frank-Wolfe algorithm, should be between 0 and 1
max_iter : int, optional
Max number of iterations
threshold_plan : float, optional
Deleting very small values in the transport plan. If above zero, it violates the marginal constraints.
verbose : bool, optional
Print information along iterations
log : bool, optional
Gives the distance estimated and the standard deviation
random_state : int or RandomState instance, optional
Fix the seed for reproducibility
Returns
-------
T : array-like, shape (`ns`, `nt`)
Optimal coupling between the two spaces
References
----------
.. [14] Kerdoncuff, Tanguy, Emonet, Rémi, Sebban, Marc
"Sampled Gromov Wasserstein."
Machine Learning Journal (MLJ). 2021.
"""
C1, C2, p, q = list_to_array(C1, C2, p, q)
nx = get_backend(C1, C2, p, q)
len_p = p.shape[0]
len_q = q.shape[0]
generator = check_random_state(random_state)
index = np.zeros(2, dtype=int)
# Initialize with default marginal
index[0] = generator.choice(len_p, size=1, p=nx.to_numpy(p))
index[1] = generator.choice(len_q, size=1, p=nx.to_numpy(q))
T = nx.tocsr(emd_1d(C1[index[0]], C2[index[1]], a=p, b=q, dense=False))
best_gw_dist_estimated = np.inf
for cpt in range(max_iter):
index[0] = generator.choice(len_p, size=1, p=nx.to_numpy(p))
T_index0 = nx.reshape(nx.todense(T[index[0], :]), (-1,))
index[1] = generator.choice(
len_q, size=1, p=nx.to_numpy(T_index0 / nx.sum(T_index0))
)
if alpha == 1:
T = nx.tocsr(emd_1d(C1[index[0]], C2[index[1]], a=p, b=q, dense=False))
else:
new_T = nx.tocsr(emd_1d(C1[index[0]], C2[index[1]], a=p, b=q, dense=False))
T = (1 - alpha) * T + alpha * new_T
# To limit the number of non 0, the values below the threshold are set to 0.
T = nx.eliminate_zeros(T, threshold=threshold_plan)
if cpt % 10 == 0 or cpt == (max_iter - 1):
gw_dist_estimated = GW_distance_estimation(
C1=C1,
C2=C2,
loss_fun=loss_fun,
p=p,
q=q,
T=T,
std=False,
random_state=generator,
)
if gw_dist_estimated < best_gw_dist_estimated:
best_gw_dist_estimated = gw_dist_estimated
best_T = nx.copy(T)
if verbose:
if cpt % 200 == 0:
print(
"{:5s}|{:12s}".format("It.", "Best gw estimated")
+ "\n"
+ "-" * 19
)
print("{:5d}|{:8e}|".format(cpt, best_gw_dist_estimated))
if log:
log = {}
log["gw_dist_estimated"], log["gw_dist_std"] = GW_distance_estimation(
C1=C1, C2=C2, loss_fun=loss_fun, p=p, q=q, T=best_T, random_state=generator
)
return best_T, log
return best_T
[docs]
def sampled_gromov_wasserstein(
C1,
C2,
p,
q,
loss_fun,
nb_samples_grad=100,
epsilon=1,
max_iter=500,
log=False,
verbose=False,
random_state=None,
):
r"""
Returns the gromov-wasserstein transport between :math:`(\mathbf{C_1}, \mathbf{p})` and :math:`(\mathbf{C_2}, \mathbf{q})` using a 1-stochastic Frank-Wolfe.
This method has a :math:`\mathcal{O}(\mathrm{max\_iter} \times N \log(N))` time complexity by relying on the 1D Optimal Transport solver.
The function solves the following optimization problem:
.. math::
\mathbf{GW} = \mathop{\arg \min}_\mathbf{T} \quad \sum_{i,j,k,l}
L(\mathbf{C_1}_{i,k}, \mathbf{C_2}_{j,l}) \mathbf{T}_{i,j} \mathbf{T}_{k,l}
s.t. \ \mathbf{T} \mathbf{1} &= \mathbf{p}
\mathbf{T}^T \mathbf{1} &= \mathbf{q}
\mathbf{T} &\geq 0
Where :
- :math:`\mathbf{C_1}`: Metric cost matrix in the source space
- :math:`\mathbf{C_2}`: Metric cost matrix in the target space
- :math:`\mathbf{p}`: distribution in the source space
- :math:`\mathbf{q}`: distribution in the target space
- `L`: loss function to account for the misfit between the similarity matrices
Parameters
----------
C1 : array-like, shape (ns, ns)
Metric cost matrix in the source space
C2 : array-like, shape (nt, nt)
Metric cost matrix in the target space
p : array-like, shape (ns,)
Distribution in the source space
q : array-like, shape (nt,)
Distribution in the target space
loss_fun : function: :math:`\mathbb{R} \times \mathbb{R} \mapsto \mathbb{R}`
Loss function used for the distance, the transport plan does not depend on the loss function
nb_samples_grad : int
Number of samples to approximate the gradient
epsilon : float
Weight of the Kullback-Leibler regularization
max_iter : int, optional
Max number of iterations
verbose : bool, optional
Print information along iterations
log : bool, optional
Gives the distance estimated and the standard deviation
random_state : int or RandomState instance, optional
Fix the seed for reproducibility
Returns
-------
T : array-like, shape (`ns`, `nt`)
Optimal coupling between the two spaces
References
----------
.. [14] Kerdoncuff, Tanguy, Emonet, Rémi, Sebban, Marc
"Sampled Gromov Wasserstein."
Machine Learning Journal (MLJ). 2021.
"""
C1, C2, p, q = list_to_array(C1, C2, p, q)
nx = get_backend(C1, C2, p, q)
len_p = p.shape[0]
len_q = q.shape[0]
generator = check_random_state(random_state)
# The most natural way to define nb_sample is with a simple integer.
if isinstance(nb_samples_grad, int):
if nb_samples_grad > len_p:
# As the sampling along the first dimension is done without replacement, the rest is reported to the second
# dimension.
nb_samples_grad_p, nb_samples_grad_q = len_p, nb_samples_grad // len_p
else:
nb_samples_grad_p, nb_samples_grad_q = nb_samples_grad, 1
else:
nb_samples_grad_p, nb_samples_grad_q = nb_samples_grad
T = nx.outer(p, q)
# continue_loop allows to stop the loop if there is several successive small modification of T.
continue_loop = 0
# The gradient of GW is more complex if the two matrices are not symmetric.
C_are_symmetric = nx.allclose(C1, C1.T, rtol=1e-10, atol=1e-10) and nx.allclose(
C2, C2.T, rtol=1e-10, atol=1e-10
)
for cpt in range(max_iter):
index0 = generator.choice(
len_p, size=nb_samples_grad_p, p=nx.to_numpy(p), replace=False
)
Lik = 0
for i, index0_i in enumerate(index0):
index1 = generator.choice(
len_q,
size=nb_samples_grad_q,
p=nx.to_numpy(T[index0_i, :] / nx.sum(T[index0_i, :])),
replace=False,
)
# If the matrices C are not symmetric, the gradient has 2 terms, thus the term is chosen randomly.
if (not C_are_symmetric) and generator.rand(1) > 0.5:
Lik += nx.mean(
loss_fun(
C1[:, [index0[i]] * nb_samples_grad_q][:, None, :],
C2[:, index1][None, :, :],
),
axis=2,
)
else:
Lik += nx.mean(
loss_fun(
C1[[index0[i]] * nb_samples_grad_q, :][:, :, None],
C2[index1, :][:, None, :],
),
axis=0,
)
max_Lik = nx.max(Lik)
if max_Lik == 0:
continue
# This division by the max is here to facilitate the choice of epsilon.
Lik /= max_Lik
if epsilon > 0:
# Set to infinity all the numbers below exp(-200) to avoid log of 0.
log_T = nx.log(nx.clip(T, np.exp(-200), 1))
log_T = nx.where(log_T == -200, -np.inf, log_T)
Lik = Lik - epsilon * log_T
try:
new_T = sinkhorn(a=p, b=q, M=Lik, reg=epsilon)
except (RuntimeWarning, UserWarning):
print("Warning caught in Sinkhorn: Return last stable T")
break
else:
new_T = emd(a=p, b=q, M=Lik)
change_T = nx.mean((T - new_T) ** 2)
if change_T <= 10e-20:
continue_loop += 1
if continue_loop > 100: # Number max of low modifications of T
T = nx.copy(new_T)
break
else:
continue_loop = 0
if verbose and cpt % 10 == 0:
if cpt % 200 == 0:
print(
"{:5s}|{:12s}".format("It.", "||T_n - T_{n+1}||") + "\n" + "-" * 19
)
print("{:5d}|{:8e}|".format(cpt, change_T))
T = nx.copy(new_T)
if log:
log = {}
log["gw_dist_estimated"], log["gw_dist_std"] = GW_distance_estimation(
C1=C1, C2=C2, loss_fun=loss_fun, p=p, q=q, T=T, random_state=generator
)
return T, log
return T