pypots/forecasting/bttf/submodules.py
"""
"""
# Created by Wenjie Du <wenjay.du@gmail.com>
# License: BSD-3-Clause
import numpy as np
from numpy.linalg import cholesky as cholesky_lower
from numpy.linalg import inv as inv
from numpy.linalg import solve as solve
from numpy.random import multivariate_normal as mvnrnd
from numpy.random import normal as normrnd
from scipy.linalg import cholesky as cholesky_upper
from scipy.linalg import khatri_rao as kr_prod
from scipy.linalg import solve_triangular as solve_ut
from scipy.stats import invwishart
from scipy.stats import wishart
def mvnrnd_pre(mu, Lambda):
src = normrnd(size=(mu.shape[0],))
return (
solve_ut(
cholesky_upper(Lambda, overwrite_a=True, check_finite=False),
src,
lower=False,
check_finite=False,
overwrite_b=True,
)
+ mu
)
def cov_mat(mat, mat_bar):
mat = mat - mat_bar
return mat.T @ mat
def ten2mat(tensor, mode):
return np.reshape(np.moveaxis(tensor, mode, 0), (tensor.shape[mode], -1), order="F")
def sample_factor_u(tau_sparse_tensor, tau_ind, U, V, X, beta0=1):
"""Sampling M-by-R factor matrix U and its hyper-parameters (mu_u, Lambda_u)."""
dim1, rank = U.shape
U_bar = np.mean(U, axis=0)
temp = dim1 / (dim1 + beta0)
var_mu_hyper = temp * U_bar
var_U_hyper = inv(np.eye(rank) + cov_mat(U, U_bar) + temp * beta0 * np.outer(U_bar, U_bar))
var_Lambda_hyper = wishart.rvs(df=dim1 + rank, scale=var_U_hyper)
var_mu_hyper = mvnrnd_pre(var_mu_hyper, (dim1 + beta0) * var_Lambda_hyper)
var1 = kr_prod(X, V).T
var2 = kr_prod(var1, var1)
var3 = (var2 @ ten2mat(tau_ind, 0).T).reshape([rank, rank, dim1]) + var_Lambda_hyper[:, :, None]
var4 = var1 @ ten2mat(tau_sparse_tensor, 0).T + (var_Lambda_hyper @ var_mu_hyper)[:, None]
for i in range(dim1):
U[i, :] = mvnrnd_pre(solve(var3[:, :, i], var4[:, i]), var3[:, :, i])
return U
def sample_factor_v(tau_sparse_tensor, tau_ind, U, V, X, beta0=1):
"""Sampling N-by-R factor matrix V and its hyper-parameters (mu_v, Lambda_v)."""
dim2, rank = V.shape
V_bar = np.mean(V, axis=0)
temp = dim2 / (dim2 + beta0)
var_mu_hyper = temp * V_bar
var_V_hyper = inv(np.eye(rank) + cov_mat(V, V_bar) + temp * beta0 * np.outer(V_bar, V_bar))
var_Lambda_hyper = wishart.rvs(df=dim2 + rank, scale=var_V_hyper)
var_mu_hyper = mvnrnd_pre(var_mu_hyper, (dim2 + beta0) * var_Lambda_hyper)
var1 = kr_prod(X, U).T
var2 = kr_prod(var1, var1)
var3 = (var2 @ ten2mat(tau_ind, 1).T).reshape([rank, rank, dim2]) + var_Lambda_hyper[:, :, None]
var4 = var1 @ ten2mat(tau_sparse_tensor, 1).T + (var_Lambda_hyper @ var_mu_hyper)[:, None]
for j in range(dim2):
V[j, :] = mvnrnd_pre(solve(var3[:, :, j], var4[:, j]), var3[:, :, j])
return V
def mnrnd(M, U, V):
"""
Generate matrix normal distributed random matrix.
M is a m-by-n matrix, U is a m-by-m matrix, and V is a n-by-n matrix.
"""
dim1, dim2 = M.shape
X0 = np.random.randn(dim1, dim2)
P = cholesky_lower(U)
Q = cholesky_lower(V)
return M + P @ X0 @ Q.T
def sample_var_coefficient(X, time_lags):
dim, rank = X.shape
d = time_lags.shape[0]
tmax = np.max(time_lags)
Z_mat = X[tmax:dim, :]
Q_mat = np.zeros((dim - tmax, rank * d))
for k in range(d):
Q_mat[:, k * rank : (k + 1) * rank] = X[tmax - time_lags[k] : dim - time_lags[k], :]
var_Psi0 = np.eye(rank * d) + Q_mat.T @ Q_mat
var_Psi = inv(var_Psi0)
var_M = var_Psi @ Q_mat.T @ Z_mat
var_S = np.eye(rank) + Z_mat.T @ Z_mat - var_M.T @ var_Psi0 @ var_M
Sigma = invwishart.rvs(df=rank + dim - tmax, scale=var_S)
return mnrnd(var_M, var_Psi, Sigma), Sigma
def sample_factor_x(tau_sparse_tensor, tau_ind, time_lags, U, V, X, A, Lambda_x):
"""Sampling T-by-R factor matrix X."""
dim3, rank = X.shape
tmax = np.max(time_lags)
tmin = np.min(time_lags)
d = time_lags.shape[0]
A0 = np.dstack([A] * d)
for k in range(d):
A0[k * rank : (k + 1) * rank, :, k] = 0
mat0 = Lambda_x @ A.T
mat1 = np.einsum("kij, jt -> kit", A.reshape([d, rank, rank]), Lambda_x)
mat2 = np.einsum("kit, kjt -> ij", mat1, A.reshape([d, rank, rank]))
var1 = kr_prod(V, U).T
var2 = kr_prod(var1, var1)
var3 = (var2 @ ten2mat(tau_ind, 2).T).reshape([rank, rank, dim3]) + Lambda_x[:, :, None]
var4 = var1 @ ten2mat(tau_sparse_tensor, 2).T
for t in range(dim3):
Mt = np.zeros((rank, rank))
Nt = np.zeros(rank)
Qt = mat0 @ X[t - time_lags, :].reshape(rank * d)
index = list(range(0, d))
if dim3 - tmax <= t < dim3 - tmin:
index = list(np.where(t + time_lags < dim3))[0]
elif t < tmax:
Qt = np.zeros(rank)
index = list(np.where(t + time_lags >= tmax))[0]
if t < dim3 - tmin:
Mt = mat2.copy()
temp = np.zeros((rank * d, len(index)))
n = 0
for k in index:
temp[:, n] = X[t + time_lags[k] - time_lags, :].reshape(rank * d)
n += 1
temp0 = X[t + time_lags[index], :].T - np.einsum("ijk, ik -> jk", A0[:, :, index], temp)
Nt = np.einsum("kij, jk -> i", mat1[index, :, :], temp0)
var3[:, :, t] = var3[:, :, t] + Mt
if t < tmax:
var3[:, :, t] = var3[:, :, t] - Lambda_x + np.eye(rank)
X[t, :] = mvnrnd_pre(solve(var3[:, :, t], var4[:, t] + Nt + Qt), var3[:, :, t])
return X
def compute_mape(var, var_hat):
return np.sum(np.abs(var - var_hat) / var) / var.shape[0]
def compute_rmse(var, var_hat):
return np.sqrt(np.sum((var - var_hat) ** 2) / var.shape[0])
def ar4cast(A, X, Sigma, time_lags, multi_step):
dim, rank = X.shape
d = time_lags.shape[0]
X_new = np.append(X, np.zeros((multi_step, rank)), axis=0)
for t in range(multi_step):
var = A.T @ X_new[dim + t - time_lags, :].reshape(rank * d)
X_new[dim + t, :] = mvnrnd(var, Sigma)
return X_new