WenjieDu/PyPOTS

View on GitHub
pypots/nn/modules/gpvae/layers.py

Summary

Maintainability
A
1 hr
Test Coverage
"""

"""

# Created by Jun Wang <jwangfx@connect.ust.hk> and Wenjie Du <wenjay.du@gmail.com>
# License: BSD-3-Clause

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F


def rbf_kernel(T, length_scale):
    xs = torch.arange(T).float()
    xs_in = torch.unsqueeze(xs, 0)
    xs_out = torch.unsqueeze(xs, 1)
    distance_matrix = (xs_in - xs_out) ** 2
    distance_matrix_scaled = distance_matrix / length_scale**2
    kernel_matrix = torch.exp(-distance_matrix_scaled)
    return kernel_matrix


def diffusion_kernel(T, length_scale):
    assert length_scale < 0.5, (
        "length_scale has to be smaller than 0.5 for the "
        "kernel matrix to be diagonally dominant"
    )
    sigmas = torch.ones(T, T) * length_scale
    sigmas_tridiag = torch.diagonal(sigmas, offset=0, dim1=-2, dim2=-1)
    sigmas_tridiag += torch.diagonal(sigmas, offset=1, dim1=-2, dim2=-1)
    sigmas_tridiag += torch.diagonal(sigmas, offset=-1, dim1=-2, dim2=-1)
    kernel_matrix = sigmas_tridiag + torch.eye(T) * (1.0 - length_scale)
    return kernel_matrix


def matern_kernel(T, length_scale):
    xs = torch.arange(T).float()
    xs_in = torch.unsqueeze(xs, 0)
    xs_out = torch.unsqueeze(xs, 1)
    distance_matrix = torch.abs(xs_in - xs_out)
    distance_matrix_scaled = distance_matrix / torch.sqrt(length_scale).type(
        torch.float32
    )
    kernel_matrix = torch.exp(-distance_matrix_scaled)
    return kernel_matrix


def cauchy_kernel(T, sigma, length_scale):
    xs = torch.arange(T).float()
    xs_in = torch.unsqueeze(xs, 0)
    xs_out = torch.unsqueeze(xs, 1)
    distance_matrix = (xs_in - xs_out) ** 2
    distance_matrix_scaled = distance_matrix / length_scale**2
    kernel_matrix = sigma / (distance_matrix_scaled + 1.0)

    alpha = 0.001
    eye = torch.eye(kernel_matrix.shape[-1])
    return kernel_matrix + alpha * eye


def make_nn(input_size, output_size, hidden_sizes):
    """This function used to creates fully connected neural network.

    Parameters
    ----------
    input_size : int,
        the dimension of input embeddings

    output_size : int,
        the dimension of out embeddings

    hidden_sizes : tuple,
        the tuple of hidden layer sizes, and the tuple length sets the number of hidden layers

    Returns
    -------
    output: tensor
        the processing embeddings
    """
    layers = []
    for i in range(len(hidden_sizes)):
        if i == 0:
            layers.append(
                nn.Linear(in_features=input_size, out_features=hidden_sizes[i])
            )
        else:
            layers.append(
                nn.Linear(in_features=hidden_sizes[i - 1], out_features=hidden_sizes[i])
            )
        layers.append(nn.ReLU())
    layers.append(nn.Linear(in_features=hidden_sizes[-1], out_features=output_size))
    return nn.Sequential(*layers)


class CustomConv1d(torch.nn.Conv1d):
    def __init(self, in_channels, out_channels, kernel_size, padding):
        super().__init__(in_channels, out_channels, kernel_size, padding)

    def forward(self, x):
        if len(x.shape) > 2:
            shape = list(np.arange(len(x.shape)))
            new_shape = [0, shape[-1]] + shape[1:-1]
            out = super().forward(x.permute(*new_shape))
            shape = list(np.arange(len(out.shape)))
            new_shape = [0, shape[-1]] + shape[1:-1]
            if self.kernel_size[0] % 2 == 0:
                out = F.pad(out, (0, -1), "constant", 0)
            return out.permute(new_shape)

        return super().forward(x)


def make_cnn(input_size, output_size, hidden_sizes, kernel_size=3):
    """This function used to construct neural network consisting of
       one 1d-convolutional layer that utilizes temporal dependencies,
       fully connected network

    Parameters
    ----------
    input_size : int,
        the dimension of input embeddings

    output_size : int,
        the dimension of out embeddings

    hidden_sizes : tuple,
        the tuple of hidden layer sizes, and the tuple length sets the number of hidden layers,

    kernel_size : int
        kernel size for convolutional layer

    Returns
    -------
    output: tensor
        the processing embeddings
    """
    padding = kernel_size // 2

    cnn_layer = CustomConv1d(
        input_size, hidden_sizes[0], kernel_size=kernel_size, padding=padding
    )
    layers = [cnn_layer]

    for i, h in zip(hidden_sizes, hidden_sizes[1:]):
        layers.extend([nn.Linear(i, h), nn.ReLU()])
    if isinstance(output_size, tuple):
        net = nn.Sequential(*layers)
        return [net] + [nn.Linear(hidden_sizes[-1], o) for o in output_size]

    layers.append(nn.Linear(hidden_sizes[-1], output_size))
    return nn.Sequential(*layers)


class GpvaeEncoder(nn.Module):
    def __init__(self, input_size, z_size, hidden_sizes=(128, 128), window_size=24):
        """This module is an encoder with 1d-convolutional network and multivariate Normal posterior used by GP-VAE with
        proposed banded covariance matrix

        Parameters
        ----------
        input_size : int,
            the feature dimension of the input

        z_size : int,
            the feature dimension of the output latent embedding

        hidden_sizes : tuple,
            the tuple of the hidden layer sizes, and the tuple length sets the number of hidden layers

        window_size : int
            the kernel size for the Conv1D layer
        """
        super().__init__()
        self.z_size = int(z_size)
        self.input_size = input_size
        self.net, self.mu_layer, self.logvar_layer = make_cnn(
            input_size, (z_size, z_size * 2), hidden_sizes, window_size
        )

    def forward(self, x):
        mapped = self.net(x)
        batch_size = mapped.size(0)
        time_length = mapped.size(1)

        num_dim = len(mapped.shape)
        mu = self.mu_layer(mapped)
        logvar = self.logvar_layer(mapped)
        mapped_mean = torch.transpose(mu, num_dim - 1, num_dim - 2)
        mapped_covar = torch.transpose(logvar, num_dim - 1, num_dim - 2)
        mapped_covar = torch.sigmoid(mapped_covar)
        mapped_reshaped = mapped_covar.reshape(batch_size, self.z_size, 2 * time_length)

        dense_shape = [batch_size, self.z_size, time_length, time_length]
        idxs_1 = np.repeat(np.arange(batch_size), self.z_size * (2 * time_length - 1))
        idxs_2 = np.tile(
            np.repeat(np.arange(self.z_size), (2 * time_length - 1)), batch_size
        )
        idxs_3 = np.tile(
            np.concatenate([np.arange(time_length), np.arange(time_length - 1)]),
            batch_size * self.z_size,
        )
        idxs_4 = np.tile(
            np.concatenate([np.arange(time_length), np.arange(1, time_length)]),
            batch_size * self.z_size,
        )
        idxs_all = np.stack([idxs_1, idxs_2, idxs_3, idxs_4], axis=1)

        mapped_values = mapped_reshaped[:, :, :-1].reshape(-1)
        prec_sparse = torch.sparse_coo_tensor(
            torch.LongTensor(idxs_all).t().to(mapped.device),
            (mapped_values).to(mapped.device),
            (dense_shape),
        )
        prec_sparse = prec_sparse.coalesce()
        prec_tril = prec_sparse.to_dense()
        eye = (
            torch.eye(prec_tril.shape[-1])
            .unsqueeze(0)
            .repeat(prec_tril.shape[0], prec_tril.shape[1], 1, 1)
            .to(mapped.device)
        )
        prec_tril = prec_tril + eye
        cov_tril = torch.linalg.solve_triangular(prec_tril, eye, upper=True)
        cov_tril = torch.where(
            torch.isfinite(cov_tril), cov_tril, torch.zeros_like(cov_tril)
        ).to(mapped.device)

        num_dim = len(cov_tril.shape)
        cov_tril_lower = torch.transpose(cov_tril, num_dim - 1, num_dim - 2)

        z_dist = torch.distributions.MultivariateNormal(
            loc=mapped_mean, scale_tril=cov_tril_lower
        )
        return z_dist


class GpvaeDecoder(nn.Module):
    def __init__(self, input_size, output_size, hidden_sizes=(256, 256)):
        """This module is a decoder with Gaussian output distribution.

        Parameters
        ----------
        output_size : int,
            the feature dimension of the output

        hidden_sizes: tuple
            the tuple of hidden layer sizes, and the tuple length sets the number of hidden layers.
        """
        super().__init__()
        self.net = make_nn(input_size, output_size, hidden_sizes)

    def forward(self, x):
        mu = self.net(x)
        var = torch.ones_like(mu)
        return torch.distributions.Normal(mu, var)