# -*- coding: utf-8 -*-
"""
Optimal transport for Gaussian Mixtures
"""
# Author: Eloi Tanguy <eloi.tanguy@u-paris>
# Remi Flamary <remi.flamary@polytehnique.edu>
# Julie Delon <julie.delon@math.cnrs.fr>
#
# License: MIT License
from .backend import get_backend
from .lp import emd2, emd
import numpy as np
from .utils import dist
from .gaussian import bures_wasserstein_mapping
[docs]
def gaussian_logpdf(x, m, C):
r"""
Compute the log of the probability density function of a multivariate
Gaussian distribution.
Parameters
----------
x : array-like, shape (..., d)
The input samples.
m : array-like, shape (d,)
The mean vector of the Gaussian distribution.
C : array-like, shape (d, d)
The covariance matrix of the Gaussian distribution.
Returns
-------
pdf : array-like, shape (...,)
The probability density function evaluated at each sample.
"""
assert (
x.shape[-1] == m.shape[-1] == C.shape[-1] == C.shape[-2]
), "Dimension mismatch"
nx = get_backend(x, m, C)
d = m.shape[0]
diff = x - m
inv_C = nx.inv(C)
z = nx.sum(diff * (diff @ inv_C), axis=-1)
_, log_det_C = nx.slogdet(C)
return -0.5 * (d * np.log(2 * np.pi) + log_det_C + z)
[docs]
def gaussian_pdf(x, m, C):
r"""
Compute the probability density function of a multivariate
Gaussian distribution.
Parameters
----------
x : array-like, shape (..., d)
The input samples.
m : array-like, shape (d,)
The mean vector of the Gaussian distribution.
C : array-like, shape (d, d)
The covariance matrix of the Gaussian distribution.
Returns
-------
pdf : array-like, shape (...,)
The probability density function evaluated at each sample.
"""
return get_backend(x, m, C).exp(gaussian_logpdf(x, m, C))
[docs]
def gmm_pdf(x, m, C, w):
r"""
Compute the probability density function (PDF) of a
Gaussian Mixture Model (GMM) at given points.
Parameters
----------
x : array-like, shape (..., d)
The input samples.
m : array-like, shape (n_components, d)
The means of the Gaussian components.
C : array-like, shape (n_components, d, d)
The covariance matrices of the Gaussian components.
w : array-like, shape (n_components,)
The weights of the Gaussian components.
Returns
-------
out : array-like, shape (...,)
The PDF values at the given points.
"""
assert (
m.shape[0] == C.shape[0] == w.shape[0]
), "All GMM parameters must have the same amount of components"
nx = get_backend(x, m, C, w)
out = nx.zeros((x.shape[:-1]))
for k in range(m.shape[0]):
out = out + w[k] * gaussian_pdf(x, m[k], C[k])
return out
[docs]
def dist_bures_squared(m_s, m_t, C_s, C_t):
r"""
Compute the matrix of the squared Bures distances between the components of
two Gaussian Mixture Models (GMMs). Used to compute the GMM Optimal
Transport distance [69].
Parameters
----------
m_s : array-like, shape (k_s, d)
Mean vectors of the source GMM.
m_t : array-like, shape (k_t, d)
Mean vectors of the target GMM.
C_s : array-like, shape (k_s, d, d)
Covariance matrices of the source GMM.
C_t : array-like, shape (k_t, d, d)
Covariance matrices of the target GMM.
Returns
-------
dist : array-like, shape (k_s, k_t)
Matrix of squared Bures distances between the components of the source
and target GMMs.
References
----------
.. [69] Delon, J., & Desolneux, A. (2020). A Wasserstein-type distance in the space of Gaussian mixture models. SIAM Journal on Imaging Sciences, 13(2), 936-970.
"""
nx = get_backend(m_s, C_s, m_t, C_t)
assert m_s.shape[0] == C_s.shape[0], "Source GMM has different amount of components"
assert m_t.shape[0] == C_t.shape[0], "Target GMM has different amount of components"
assert (
m_s.shape[-1] == m_t.shape[-1] == C_s.shape[-1] == C_t.shape[-1]
), "All GMMs must have the same dimension"
D_means = dist(m_s, m_t, metric="sqeuclidean")
# C2[i, j] = Cs12[i] @ C_t[j] @ Cs12[i], shape (k_s, k_t, d, d)
Cs12 = nx.sqrtm(C_s) # broadcasts matrix sqrt over (k_s,)
C2 = nx.einsum("ikl,jlm,imn->ijkn", Cs12, C_t, Cs12)
C = nx.sqrtm(C2) # broadcasts matrix sqrt over (k_s, k_t)
# D_covs[i,j] = trace(C_s[i] + C_t[j] - 2C[i,j])
trace_C_s = nx.einsum("ikk->i", C_s)[:, None] # (k_s, 1)
trace_C_t = nx.einsum("ikk->i", C_t)[None, :] # (1, k_t)
D_covs = trace_C_s + trace_C_t # broadcasts to (k_s, k_t)
D_covs -= 2 * nx.einsum("ijkk->ij", C)
return nx.maximum(D_means + D_covs, 0)
[docs]
def gmm_ot_loss(m_s, m_t, C_s, C_t, w_s, w_t, log=False):
r"""
Compute the Gaussian Mixture Model (GMM) Optimal Transport distance between
two GMMs introduced in [69].
Parameters
----------
m_s : array-like, shape (k_s, d)
Mean vectors of the source GMM.
m_t : array-like, shape (k_t, d)
Mean vectors of the target GMM.
C_s : array-like, shape (k_s, d, d)
Covariance matrices of the source GMM.
C_t : array-like, shape (k_t, d, d)
Covariance matrices of the target GMM.
w_s : array-like, shape (k_s,)
Weights of the source GMM components.
w_t : array-like, shape (k_t,)
Weights of the target GMM components.
log: bool, optional (default=False)
If True, returns a dictionary containing the cost and dual variables.
Otherwise returns only the GMM optimal transportation cost.
Returns
-------
loss : float or array-like
The GMM-OT loss.
log : dict, optional
If input log is true, a dictionary containing the
cost and dual variables and exit status
References
----------
.. [69] Delon, J., & Desolneux, A. (2020). A Wasserstein-type distance in the space of Gaussian mixture models. SIAM Journal on Imaging Sciences, 13(2), 936-970.
"""
get_backend(m_s, C_s, w_s, m_t, C_t, w_t)
assert m_s.shape[0] == w_s.shape[0], "Source GMM has different amount of components"
assert m_t.shape[0] == w_t.shape[0], "Target GMM has different amount of components"
D = dist_bures_squared(m_s, m_t, C_s, C_t)
return emd2(w_s, w_t, D, log=log)
[docs]
def gmm_ot_plan(m_s, m_t, C_s, C_t, w_s, w_t, log=False):
r"""
Compute the Gaussian Mixture Model (GMM) Optimal Transport plan between
two GMMs introduced in [69].
Parameters
----------
m_s : array-like, shape (k_s, d)
Mean vectors of the source GMM.
m_t : array-like, shape (k_t, d)
Mean vectors of the target GMM.
C_s : array-like, shape (k_s, d, d)
Covariance matrices of the source GMM.
C_t : array-like, shape (k_t, d, d)
Covariance matrices of the target GMM.
w_s : array-like, shape (k_s,)
Weights of the source GMM components.
w_t : array-like, shape (k_t,)
Weights of the target GMM components.
log : bool, optional (default=False)
If True, returns a dictionary containing the cost and dual variables.
Otherwise returns only the GMM optimal transportation matrix.
Returns
-------
plan : array-like, shape (k_s, k_t)
The GMM-OT plan.
log : dict, optional
If input log is true, a dictionary containing the
cost and dual variables and exit status
References
----------
.. [69] Delon, J., & Desolneux, A. (2020). A Wasserstein-type distance in the space of Gaussian mixture models. SIAM Journal on Imaging Sciences, 13(2), 936-970.
"""
get_backend(m_s, C_s, w_s, m_t, C_t, w_t)
assert m_s.shape[0] == w_s.shape[0], "Source GMM has different amount of components"
assert m_t.shape[0] == w_t.shape[0], "Target GMM has different amount of components"
D = dist_bures_squared(m_s, m_t, C_s, C_t)
return emd(w_s, w_t, D, log=log)
[docs]
def gmm_ot_apply_map(
x, m_s, m_t, C_s, C_t, w_s, w_t, plan=None, method="bary", seed=None
):
r"""
Apply Gaussian Mixture Model (GMM) optimal transport (OT) mapping to input
data. The 'barycentric' mapping corresponds to the barycentric projection
of the GMM-OT plan, and is called T_bary in [69]. The 'random' mapping takes
for each input point a random pair (i,j) of components of the GMMs and
applied the Gaussian map, it is called T_rand in [69].
Parameters
----------
x : array-like, shape (n_samples, d)
Input data points.
m_s : array-like, shape (k_s, d)
Mean vectors of the source GMM components.
m_t : array-like, shape (k_t, d)
Mean vectors of the target GMM components.
C_s : array-like, shape (k_s, d, d)
Covariance matrices of the source GMM components.
C_t : array-like, shape (k_t, d, d)
Covariance matrices of the target GMM components.
w_s : array-like, shape (k_s,)
Weights of the source GMM components.
w_t : array-like, shape (k_t,)
Weights of the target GMM components.
plan : array-like, shape (k_s, k_t), optional
Optimal transport plan between the source and target GMM components.
If not provided, it will be computed internally.
method : {'bary', 'rand'}, optional
Method for applying the GMM OT mapping. 'bary' uses barycentric mapping,
while 'rand' uses random sampling. Default is 'bary'.
seed : int, optional
Seed for the random number generator. Only used when method='rand'.
Returns
-------
out : array-like, shape (n_samples, d)
Output data points after applying the GMM OT mapping.
References
----------
.. [69] Delon, J., & Desolneux, A. (2020). A Wasserstein-type distance in the space of Gaussian mixture models. SIAM Journal on Imaging Sciences, 13(2), 936-970.
"""
if plan is None:
plan = gmm_ot_plan(m_s, m_t, C_s, C_t, w_s, w_t)
nx = get_backend(x, m_s, m_t, C_s, C_t, w_s, w_t)
else:
nx = get_backend(x, m_s, m_t, C_s, C_t, w_s, w_t, plan)
k_s, k_t = m_s.shape[0], m_t.shape[0]
d = m_s.shape[1]
n_samples = x.shape[0]
if method == "bary":
out = nx.zeros(x.shape)
logpdf = nx.stack(
[gaussian_logpdf(x, m_s[k], C_s[k])[:, None] for k in range(k_s)]
)
# only need to compute for non-zero plan entries
for i, j in zip(*nx.where(plan > 0)):
Cs12 = nx.sqrtm(C_s[i])
Cs12inv = nx.inv(Cs12)
M0 = nx.sqrtm(Cs12 @ C_t[j] @ Cs12)
A = Cs12inv @ M0 @ Cs12inv
b = m_t[j] - A @ m_s[i]
# gaussian mapping between components i and j applied to x
T_ij_x = x @ A + b
z = w_s[:, None, None] * nx.exp(logpdf - logpdf[i][None, :, :])
denom = nx.sum(z, axis=0)
out = out + plan[i, j] * T_ij_x / denom
return out
else: # rand
# A[i, j] is the linear part of the gaussian mapping between components
# i and j, b[i, j] is the translation part
rng = np.random.RandomState(seed)
A = nx.zeros((k_s, k_t, d, d))
b = nx.zeros((k_s, k_t, d))
# only need to compute for non-zero plan entries
for i, j in zip(*nx.where(plan > 0)):
Cs12 = nx.sqrtm(C_s[i])
Cs12inv = nx.inv(Cs12)
M0 = nx.sqrtm(Cs12 @ C_t[j] @ Cs12)
A[i, j] = Cs12inv @ M0 @ Cs12inv
b[i, j] = m_t[j] - A[i, j] @ m_s[i]
logpdf = nx.stack(
[gaussian_logpdf(x, m_s[k], C_s[k]) for k in range(k_s)], axis=-1
)
# (n_samples, k_s)
out = nx.zeros(x.shape)
for i_sample in range(n_samples):
log_g = logpdf[i_sample]
log_diff = log_g[:, None] - log_g[None, :]
weighted_exp = w_s[:, None] * nx.exp(log_diff)
denom = nx.sum(weighted_exp, axis=0)[:, None] * nx.ones(plan.shape[1])
p_mat = plan / denom
p = p_mat.reshape(k_s * k_t) # stack line-by-line
# sample between 0 and k_s * k_t - 1
ij_mat = rng.choice(k_s * k_t, p=p)
i = ij_mat // k_t
j = ij_mat % k_t
out[i_sample] = A[i, j] @ x[i_sample] + b[i, j]
return out
[docs]
def gmm_ot_plan_density(x, y, m_s, m_t, C_s, C_t, w_s, w_t, plan=None, atol=1e-2):
"""
Compute the density of the Gaussian Mixture Model - Optimal Transport
coupling between GMMS at given points, as introduced in [69].
Given two arrays of points x and y, the function computes the density at
each point `(x[i], y[i])` of the product space.
Parameters
----------
x : array-like, shape (n, d)
Entry points in source space for density computation.
y : array-like, shape (m, d)
Entry points in target space for density computation.
m_s : array-like, shape (k_s, d)
The means of the source GMM components.
m_t : array-like, shape (k_t, d)
The means of the target GMM components.
C_s : array-like, shape (k_s, d, d)
The covariance matrices of the source GMM components.
C_t : array-like, shape (k_t, d, d)
The covariance matrices of the target GMM components.
w_s : array-like, shape (k_s,)
The weights of the source GMM components.
w_t : array-like, shape (k_t,)
The weights of the target GMM components.
plan : array-like, shape (k_s, k_t), optional
The optimal transport plan between the source and target GMMs.
If not provided, it will be computed using `gmm_ot_plan`.
atol : float, optional
The absolute tolerance used to determine the support of the GMM-OT
coupling.
Returns
-------
density : array-like, shape (n, m)
The density of the GMM-OT coupling between the two GMMs.
References
----------
.. [69] Delon, J., & Desolneux, A. (2020). A Wasserstein-type distance in the space of Gaussian mixture models. SIAM Journal on Imaging Sciences, 13(2), 936-970.
"""
assert (
x.shape[-1] == y.shape[-1]
), "x (n, d) and y (m, d) must have the same dimension d"
n, m = x.shape[0], y.shape[0]
nx = get_backend(x, y, m_s, m_t, C_s, C_t, w_s, w_t)
# hand-made d-variate meshgrid in ij indexing
xx = x[:, None, :] * nx.ones((1, m, 1)) # shapes (n, m, d)
yy = y[None, :, :] * nx.ones((n, 1, 1)) # shapes (n, m, d)
if plan is None:
plan = gmm_ot_plan(m_s, m_t, C_s, C_t, w_s, w_t)
def Tk0k1(k0, k1):
A, b = bures_wasserstein_mapping(m_s[k0], m_t[k1], C_s[k0], C_t[k1])
Tx = xx @ A + b
g = gaussian_pdf(xx, m_s[k0], C_s[k0])
out = plan[k0, k1] * g
norms = nx.norm(Tx - yy, axis=-1)
out = out * ((norms < atol) * 1.0)
return out
mat = nx.stack(
[
nx.stack([Tk0k1(k0, k1) for k1 in range(m_t.shape[0])])
for k0 in range(m_s.shape[0])
]
)
return nx.sum(mat, axis=(0, 1))