Garve/scikit-bonus

View on GitHub
skbonus/timeseries/smoothing.py

Summary

Maintainability
A
0 mins
Test Coverage
"""Smooth time series data."""

from __future__ import annotations

from abc import ABC, abstractmethod

import numpy as np
from scipy.signal import convolve2d
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted


class Smoother(BaseEstimator, TransformerMixin, ABC):
    """
    Smooth the columns of an array by applying a convolution.

    Parameters
    ----------
    window : int
        Size of the sliding window. The effect of a holiday will reach from approximately
        date - `window/2 * frequency` to date + `window/2 * frequency`, i.e. it is centered around the dates in `dates`.

    mode : str
        Which convolution mode to use. Can be one of

            - "full": The output is the full discrete linear convolution of the inputs.
            - "valid": The output consists only of those elements that do not rely on the zero-padding.
            - "same": The output is the same size as the first input, centered with respect to the 'full' output.
    """

    def __init__(
        self,
        window: int,
        mode: str,
    ) -> None:
        """Initialize."""
        self.window = window
        self.mode = mode

    @abstractmethod
    def _set_sliding_window(self) -> None:
        """
        Calculate the sliding window.

        Returns
        -------
        None
        """

    def fit(self, X: np.ndarray, y: None = None) -> Smoother:
        """
        Fit the estimator.

        The frequency is computed and the sliding window is created.

        Parameters
        ----------
        X : np.ndarray
            Used for inferring the frequency, if not provided during initialization.

        y : Ignored
            Not used, present here for API consistency by convention.

        Returns
        -------
        GeneralGaussianSmoother
            Fitted transformer.
        """
        X = check_array(X)
        self._check_n_features(X, reset=True)
        self._set_sliding_window()
        self.sliding_window_ = (
            self.sliding_window_.reshape(-1, 1) / self.sliding_window_.sum()
        )

        return self

    def transform(self, X: np.ndarray) -> np.ndarray:
        """
        Add the new date feature to the dataframe.

        Parameters
        ----------
        X : np.ndarray
            A pandas dataframe with a DatetimeIndex.

        Returns
        -------
        np.ndarray
            The input dataframe with an additional column for special dates.
        """
        check_is_fitted(self)
        X = check_array(X)
        self._check_n_features(X, reset=False)

        convolution = convolve2d(X, self.sliding_window_, mode=self.mode)

        if self.mode == "full" and self.window > 1:
            convolution = convolution[: -self.window + 1]

        return convolution


class GeneralGaussianSmoother(Smoother):
    """
    Smooth the columns of an array by applying a convolution with a generalized Gaussian curve.

    Parameters
    ----------
    window : int, default=1
        Size of the sliding window. The effect of a holiday will reach from approximately
        date - `window/2 * frequency` to date + `window/2 * frequency`, i.e. it is centered around the dates in `dates`.

    p : float, default=1
        Parameter for the shape of the curve. p=1 yields a typical Gaussian curve while p=0.5 yields a Laplace curve, for example.

    sig : float, default=1
        Parameter for the standard deviation of the bell-shaped curve.

    tails : str, default="both"
        Which tails to use. Can be one of

            - "left"
            - "right"
            - "both"

    Examples
    --------
    >>> import numpy as np
    >>> X = np.array([0, 0, 0, 1, 0, 0, 0]).reshape(-1, 1)
    >>> GeneralGaussianSmoother().fit_transform(X)
    array([[0.],
           [0.],
           [0.],
           [1.],
           [0.],
           [0.],
           [0.]])

    >>> GeneralGaussianSmoother(window=5, p=1, sig=1).fit_transform(X)
    array([[0.        ],
           [0.05448868],
           [0.24420134],
           [0.40261995],
           [0.24420134],
           [0.05448868],
           [0.        ]])

    >>> GeneralGaussianSmoother(window=7, tails="right").fit_transform(X)
    array([[0.        ],
           [0.        ],
           [0.        ],
           [0.57045881],
           [0.34600076],
           [0.0772032 ],
           [0.00633722]])
    """

    def __init__(
        self,
        window: int = 1,
        p: float = 1,
        sig: float = 1,
        tails: str = "both",
    ) -> None:
        """Initialize."""
        super().__init__(window, mode="same")
        self.p = p
        self.sig = sig
        self.tails = tails

    def _set_sliding_window(self) -> None:
        """
        Calculate the sliding window.

        Returns
        -------
        None

        Raises
        ------
        ValueError
            If the provided value for `tails` is not "left", "right" or "both".
        """
        self.sliding_window_ = np.exp(
            -0.5
            * np.abs(np.arange(-self.window // 2 + 1, self.window // 2 + 1) / self.sig)
            ** (2 * self.p)
        )
        if self.tails == "left":
            self.sliding_window_[self.window // 2 + 1 :] = 0
        elif self.tails == "right":
            self.sliding_window_[: self.window // 2] = 0
        elif self.tails != "both":
            raise ValueError(
                "tails keyword has to be one of 'both', 'left' or 'right'."
            )


class ExponentialDecaySmoother(Smoother):
    """
    Smooth the columns of an array by applying a convolution with a exponentially decaying curve.

    This class can be used for modelling carry over effects in marketing mix models.

    Parameters
    ----------
    window : int, default=1
        Size of the sliding window. The effect of a holiday will reach from approximately
        date - `window/2 * frequency` to date + `window/2 * frequency`, i.e. it is centered around the dates in `dates`.

    strength : float, default=0.0
        Fraction of the spending effect that is carried over.

    peak : float, default=0.0
        Where the carryover effect peaks.

    exponent : float, default=1.0
        To further widen or narrow the carryover curve. A value of 1.0 yields a normal exponential decay.
        With values larger than 1.0, a super exponential decay can be achieved.

    Examples
    --------
    >>> import numpy as np
    >>> X = np.array([0, 0, 0, 1, 0, 0, 0]).reshape(-1, 1)
    >>> ExponentialDecaySmoother().fit_transform(X)
    array([[0.],
           [0.],
           [0.],
           [1.],
           [0.],
           [0.],
           [0.]])

    >>> ExponentialDecaySmoother(window=3, strength=0.5).fit_transform(X)
    array([[0.        ],
           [0.        ],
           [0.        ],
           [0.57142857],
           [0.28571429],
           [0.14285714],
           [0.        ]])

    >>> ExponentialDecaySmoother(window=3, strength=0.5, peak=1).fit_transform(X)
    array([[0.  ],
           [0.  ],
           [0.  ],
           [0.25],
           [0.5 ],
           [0.25],
           [0.  ]])
    """

    def __init__(
        self,
        window: int = 1,
        strength: float = 0.0,
        peak: float = 0.0,
        exponent: float = 1.0,
    ) -> None:
        """Initialize."""
        super().__init__(window, mode="full")
        self.strength = strength
        self.peak = peak
        self.exponent = exponent

    def _set_sliding_window(self) -> None:
        """
        Calculate the sliding window.

        Returns
        -------
        None
        """
        self.sliding_window_ = self.strength ** (
            np.abs(np.arange(self.window) - self.peak) ** self.exponent
        )