Wasserstein 2 Minibatch GAN with PyTorch

In this example we train a Wasserstein GAN using Wasserstein 2 on minibatches as a distribution fitting term.

We want to train a generator \(G_\theta\) that generates realistic data from random noise drawn form a Gaussian \(\mu_n\) distribution so that the data is indistinguishable from true data in the data distribution \(\mu_d\). To this end Wasserstein GAN [Arjovsky2017] aim at optimizing the parameters \(\theta\) of the generator with the following optimization problem:

\[\min_{\theta} W(\mu_d,G_\theta\#\mu_n)\]

In practice we do not have access to the full distribution \(\mu_d\) but samples and we cannot compute the Wasserstein distance for large dataset. [Arjovsky2017] proposed to approximate the dual potential of Wasserstein 1 with a neural network recovering an optimization problem similar to GAN. In this example we will optimize the expectation of the Wasserstein distance over minibatches at each iterations as proposed in [Genevay2018]. Optimizing the Minibatches of the Wasserstein distance has been studied in [Fatras2019].

[Arjovsky2017] Arjovsky, M., Chintala, S., & Bottou, L. (2017, July). Wasserstein generative adversarial networks. In International conference on machine learning (pp. 214-223). PMLR.

[Genevay2018] Genevay, Aude, Gabriel Peyré, and Marco Cuturi. “Learning generative models with sinkhorn divergences.” International Conference on Artificial Intelligence and Statistics. PMLR, 2018.

[Fatras2019] Fatras, K., Zine, Y., Flamary, R., Gribonval, R., & Courty, N. (2020, June). Learning with minibatch Wasserstein: asymptotic and gradient properties. In the 23nd International Conference on Artificial Intelligence and Statistics (Vol. 108).

# Author: Remi Flamary <remi.flamary@polytechnique.edu>
#
# License: MIT License

# sphinx_gallery_thumbnail_number = 3

import numpy as np
import matplotlib.pyplot as pl
import matplotlib.animation as animation
import torch
from torch import nn
import ot

Data generation

torch.manual_seed(1)
sigma = 0.1
n_dims = 2
n_features = 2


def get_data(n_samples):
    c = torch.rand(size=(n_samples, 1))
    angle = c * 2 * np.pi
    x = torch.cat((torch.cos(angle), torch.sin(angle)), 1)
    x += torch.randn(n_samples, 2) * sigma
    return x

Plot data

# plot the distributions
x = get_data(500)
pl.figure(1)
pl.scatter(x[:, 0], x[:, 1], label="Data samples from $\mu_d$", alpha=0.5)
pl.title("Data distribution")
pl.legend()
Data distribution
<matplotlib.legend.Legend object at 0x7982015337c0>

Generator Model

# define the MLP model
class Generator(torch.nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.fc1 = nn.Linear(n_features, 200)
        self.fc2 = nn.Linear(200, 500)
        self.fc3 = nn.Linear(500, n_dims)
        self.relu = torch.nn.ReLU()  # instead of Heaviside step fn

    def forward(self, x):
        output = self.fc1(x)
        output = self.relu(output)  # instead of Heaviside step fn
        output = self.fc2(output)
        output = self.relu(output)
        output = self.fc3(output)
        return output

Training the model

G = Generator()
optimizer = torch.optim.RMSprop(G.parameters(), lr=0.00019, eps=1e-5)

# number of iteration and size of the batches
n_iter = 200  # set to 200 for doc build but 1000 is better ;)
size_batch = 500

# generate statis samples to see their trajectory along training
n_visu = 100
xnvisu = torch.randn(n_visu, n_features)
xvisu = torch.zeros(n_iter, n_visu, n_dims)

ab = torch.ones(size_batch) / size_batch
losses = []


for i in range(n_iter):
    # generate noise samples
    xn = torch.randn(size_batch, n_features)

    # generate data samples
    xd = get_data(size_batch)

    # generate sample along iterations
    xvisu[i, :, :] = G(xnvisu).detach()

    # generate samples and compte distance matrix
    xg = G(xn)
    M = ot.dist(xg, xd)

    loss = ot.emd2(ab, ab, M)
    losses.append(float(loss.detach()))

    if i % 10 == 0:
        print("Iter: {:3d}, loss={}".format(i, losses[-1]))

    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

    del M

pl.figure(2)
pl.semilogy(losses)
pl.grid()
pl.title("Wasserstein distance")
pl.xlabel("Iterations")
Wasserstein distance
Iter:   0, loss=0.9009847640991211
Iter:  10, loss=0.10964284837245941
Iter:  20, loss=0.04564394801855087
Iter:  30, loss=0.03516071289777756
Iter:  40, loss=0.05013977363705635
Iter:  50, loss=0.058588504791259766
Iter:  60, loss=0.03730057179927826
Iter:  70, loss=0.04171676188707352
Iter:  80, loss=0.03168988972902298
Iter:  90, loss=0.031197285279631615
Iter: 100, loss=0.03596879169344902
Iter: 110, loss=0.03272819146513939
Iter: 120, loss=0.032379165291786194
Iter: 130, loss=0.03959248960018158
Iter: 140, loss=0.029337508603930473
Iter: 150, loss=0.05796702206134796
Iter: 160, loss=0.034939464181661606
Iter: 170, loss=0.022607704624533653
Iter: 180, loss=0.04347885772585869
Iter: 190, loss=0.1164197325706482

Text(0.5, 23.52222222222222, 'Iterations')

Plot trajectories of generated samples along iterations

pl.figure(3, (10, 10))

ivisu = [0, 10, 25, 50, 75, 125, 15, 175, 199]

for i in range(9):
    pl.subplot(3, 3, i + 1)
    pl.scatter(xd[:, 0], xd[:, 1], label="Data samples from $\mu_d$", alpha=0.1)
    pl.scatter(
        xvisu[ivisu[i], :, 0],
        xvisu[ivisu[i], :, 1],
        label="Data samples from $G\#\mu_n$",
        alpha=0.5,
    )
    pl.xticks(())
    pl.yticks(())
    pl.title("Iter. {}".format(ivisu[i]))
    if i == 0:
        pl.legend()
Iter. 0, Iter. 10, Iter. 25, Iter. 50, Iter. 75, Iter. 125, Iter. 15, Iter. 175, Iter. 199

Animate trajectories of generated samples along iteration

pl.figure(4, (8, 8))


def _update_plot(i):
    pl.clf()
    pl.scatter(xd[:, 0], xd[:, 1], label="Data samples from $\mu_d$", alpha=0.1)
    pl.scatter(
        xvisu[i, :, 0], xvisu[i, :, 1], label="Data samples from $G\#\mu_n$", alpha=0.5
    )
    pl.xticks(())
    pl.yticks(())
    pl.xlim((-1.5, 1.5))
    pl.ylim((-1.5, 1.5))
    pl.title("Iter. {}".format(i))
    return 1


i = 0
pl.scatter(xd[:, 0], xd[:, 1], label="Data samples from $\mu_d$", alpha=0.1)
pl.scatter(
    xvisu[i, :, 0], xvisu[i, :, 1], label="Data samples from $G\#\mu_n$", alpha=0.5
)
pl.xticks(())
pl.yticks(())
pl.xlim((-1.5, 1.5))
pl.ylim((-1.5, 1.5))
pl.title("Iter. {}".format(ivisu[i]))


ani = animation.FuncAnimation(
    pl.gcf(), _update_plot, n_iter, interval=100, repeat_delay=2000
)

Generate and visualize data

size_batch = 500
xd = get_data(size_batch)
xn = torch.randn(size_batch, 2)
x = G(xn).detach().numpy()

pl.figure(5)
pl.scatter(xd[:, 0], xd[:, 1], label="Data samples from $\mu_d$", alpha=0.5)
pl.scatter(x[:, 0], x[:, 1], label="Data samples from $G\#\mu_n$", alpha=0.5)
pl.title("Sources and Target distributions")
pl.legend()
Sources and Target distributions
<matplotlib.legend.Legend object at 0x7982001bbf10>

Total running time of the script: (1 minutes 9.469 seconds)

Gallery generated by Sphinx-Gallery