pypots/nn/modules/pyraformer/layers.py
"""
"""
# Created by Wenjie Du <wenjay.du@gmail.com>
# License: BSD-3-Clause
import math
import torch
import torch.fft
import torch.nn as nn
def get_mask(input_size, window_size, inner_size):
"""Get the attention mask of PAM-Naive"""
# Get the size of all layers
all_size = [input_size]
for i in range(len(window_size)):
layer_size = math.floor(all_size[i] / window_size[i])
all_size.append(layer_size)
seq_length = sum(all_size)
mask = torch.zeros(seq_length, seq_length)
# get intra-scale mask
inner_window = inner_size // 2
for layer_idx in range(len(all_size)):
start = sum(all_size[:layer_idx])
for i in range(start, start + all_size[layer_idx]):
left_side = max(i - inner_window, start)
right_side = min(i + inner_window + 1, start + all_size[layer_idx])
mask[i, left_side:right_side] = 1
# get inter-scale mask
for layer_idx in range(1, len(all_size)):
start = sum(all_size[:layer_idx])
for i in range(start, start + all_size[layer_idx]):
left_side = (start - all_size[layer_idx - 1]) + (i - start) * window_size[layer_idx - 1]
if i == (start + all_size[layer_idx] - 1):
right_side = start
else:
right_side = (start - all_size[layer_idx - 1]) + (i - start + 1) * window_size[layer_idx - 1]
mask[i, left_side:right_side] = 1
mask[left_side:right_side, i] = 1
mask = (1 - mask).bool()
return mask, all_size
def refer_points(all_sizes, window_size):
"""Gather features from PAM's pyramid sequences"""
input_size = all_sizes[0]
indexes = torch.zeros(input_size, len(all_sizes))
for i in range(input_size):
indexes[i][0] = i
former_index = i
for j in range(1, len(all_sizes)):
start = sum(all_sizes[:j])
inner_layer_idx = former_index - (start - all_sizes[j - 1])
former_index = start + min(inner_layer_idx // window_size[j - 1], all_sizes[j] - 1)
indexes[i][j] = former_index
indexes = indexes.unsqueeze(0).unsqueeze(3)
return indexes.long()
class ConvLayer(nn.Module):
def __init__(self, c_in, window_size):
super().__init__()
self.downConv = nn.Conv1d(
in_channels=c_in,
out_channels=c_in,
kernel_size=window_size,
stride=window_size,
)
self.norm = nn.BatchNorm1d(c_in)
self.activation = nn.ELU()
def forward(self, x):
x = self.downConv(x)
x = self.norm(x)
x = self.activation(x)
return x
class Bottleneck_Construct(nn.Module):
"""Bottleneck convolution CSCM"""
def __init__(self, d_model, window_size, d_inner):
super().__init__()
if not isinstance(window_size, list):
self.conv_layers = nn.ModuleList(
[
ConvLayer(d_inner, window_size),
ConvLayer(d_inner, window_size),
ConvLayer(d_inner, window_size),
]
)
else:
self.conv_layers = []
for i in range(len(window_size)):
self.conv_layers.append(ConvLayer(d_inner, window_size[i]))
self.conv_layers = nn.ModuleList(self.conv_layers)
self.up = nn.Linear(d_inner, d_model)
self.down = nn.Linear(d_model, d_inner)
self.norm = nn.LayerNorm(d_model)
def forward(self, enc_input):
temp_input = self.down(enc_input).permute(0, 2, 1)
all_inputs = []
for i in range(len(self.conv_layers)):
temp_input = self.conv_layers[i](temp_input)
all_inputs.append(temp_input)
all_inputs = torch.cat(all_inputs, dim=2).transpose(1, 2)
all_inputs = self.up(all_inputs)
all_inputs = torch.cat([enc_input, all_inputs], dim=1)
all_inputs = self.norm(all_inputs)
return all_inputs