paradoxysm/nnrf

View on GitHub
nnrf/utils/_estimator.py

Summary

Maintainability
B
5 hrs
Test Coverage
import numpy as np
from tqdm import trange
from copy import copy
from abc import ABC, abstractmethod

from nnrf.utils import calculate_batch, create_random_state, \
                        BatchDataset, one_hot, decode, check_XY
from nnrf.analysis import get_metrics
from nnrf.ml import get_loss

from nnrf.utils._base import Base

class BaseEstimator(Base, ABC):
    """
    Base Estimator Class.
    Implements common methods for estimators.

    Parameters
    ----------
    verbose : int, default=0
        Verbosity of estimator; higher values result in
        more verbose output.

    warm_start : bool, default=False
        Determines warm starting to allow training to pick
        up from previous training sessions.

    metric : str, Metric, or None, default='accuracy'
        Metric for estimator score.

    Attributes
    ----------
    fitted_ : bool
        True if the model has been deemed trained and
        ready to predict new data.

    n_classes_ : int
        Number of classes.

    n_features : int
        Number of features accepted as input.
    """
    def __init__(self, verbose=0, warm_start=False, metric='accuracy'):
        self.verbose = verbose
        self.warm_start = warm_start
        if metric is None : metric = 'accuracy'
        self.metric = get_metrics(metric)
        self.fitted_ = False
        self.n_classes_ = None
        self.n_features_ = None

    @abstractmethod
    def fit(self, X, Y, *args, weights=None, **kwargs):
        """
        Train the model on the given data and labels.

        Parameters
        ----------
        X : array-like, shape=(n_samples, n_features)
            Training data.

        Y : array-like, shape=(n_samples,)
            Target labels as integers.

        weights : array-like, shape=(n_samples,), default=None
            Sample weights. If None, then samples are equally weighted.

        Returns
        -------
        self : Base
            Fitted estimator.
        """
        raise NotImplementedError("No fit function implemented")

    def predict(self, X):
        """
        Predict classes for each sample in `X`.

        Parameters
        ----------
        X : array-like, shape=(n_samples, n_features)
            Data to predict.

        Returns
        -------
        Y : array-like, shape=(n_samples,)
            Predicted labels.
        """
        pred = self.predict_proba(X)
        pred = np.argmax(pred, axis=1)
        return pred

    def predict_log_proba(self, X):
        """
        Predict class log-probabilities for each sample in `X`.

        Parameters
        ----------
        X : array-like, shape=(n_samples, n_features)
            Data to predict.

        Returns
        -------
        proba : array-like, shape=(n_samples, n_classes)
            Class log-probabilities of input data.
            The order of classes is in sorted ascending order.
        """
        return np.log(self.predict_proba(X))

    @abstractmethod
    def predict_proba(self, X, *args, **kwargs):
        """
        Predict class probabilities for each sample in `X`.

        Parameters
        ----------
        X : array-like, shape=(n_samples, n_features)
            Data to predict.

        Returns
        -------
        proba : array-like, shape=(n_samples, n_classes)
            Class probabilities of input data.
            The order of classes is in sorted ascending order.
        """
        raise NotImplementedError("No predict_proba function implemented")

    def score(self, Y, X=None, Y_hat=None, weights=None):
        """
        Return mean metric of the estimator on the given
        data/predictions and target labels.

        If both data and predictions are provided, `score`
        just uses the predictions.

        Parameters
        ----------
        Y : array-like, shape=(n_samples,)
            Target labels as integers.

        X : array-like, shape=(n_samples, n_features), default=None
            Data to predict.

        Y_hat : array-like, shape=(n_samples,), default=None
            Predicted labels.

        weights : array-like, shape=(n_samples,), default=None
            Sample weights. If None, then samples are equally weighted.

        Returns
        -------
        score : float
            Mean metric score of the estimator for the given
            data/labels.
        """
        if X is None and Y_hat is None:
            raise ValueError("Either X or Y_hat must be provided")
        elif Y_hat is None:
            Y_hat = self.predict(X)
        if self.metric is not None:
            if weights is None : weights = np.ones(len(Y_hat))
            return self.metric.score(Y_hat, Y, weights=weights)
        return 0

    def _is_fitted(self):
        """
        Return True if the model is properly ready
        for prediction.

        Returns
        -------
        fitted : bool
            True if the model can be used to predict data.
        """
        return self.fitted_


class BaseClassifier(BaseEstimator):
    """
    Base Estimator Class.
    Implements common methods for estimators.

    Parameters
    ----------
    loss : str, LossFunction, default='cross-entropy'
        Loss function to use for training. Must be
        one of the default loss functions or an object
        that extends LossFunction.

    max_iter : int, default=10
        Maximum number of epochs to conduct during training.

    tol : float, default=1e-4
        Convergence criteria for early stopping.

    batch_size : int, float, default=None
        Batch size for training. Must be one of:
         - int : Use `batch_size`.
         - float : Use `batch_size * n_samples`.
         - None : Use `n_samples`.

    verbose : int, default=0
        Verbosity of estimator; higher values result in
        more verbose output.

    warm_start : bool, default=False
        Determines warm starting to allow training to pick
        up from previous training sessions.

    class_weight : dict, 'balanced', or None, default=None
        Weights associated with classes in the form
        `{class_label: weight}`. Must be one of:
         - None : All classes have a weight of one.
         - 'balanced': Class weights are automatically calculated as
                        `n_samples / (n_samples * np.bincount(Y))`.

    metric : str, Metric, or None, default='accuracy'
        Metric for estimator score.

    random_state : None or int or RandomState, default=None
        Initial seed for the RandomState. If seed is None,
        return the RandomState singleton. If seed is an int,
        return a RandomState with the seed set to the int.
        If seed is a RandomState, return that RandomState.

    Attributes
    ----------
    fitted_ : bool
        True if the model has been deemed trained and
        ready to predict new data.

    n_classes_ : int
        Number of classes.

    n_features : int
        Number of features accepted as input.
    """
    def __init__(self, loss='cross-entropy', max_iter=100, tol=1e-4,
                    batch_size=None, verbose=0, warm_start=False,
                    class_weight=None, metric='accuracy', random_state=None):
        super().__init__(verbose=verbose, warm_start=warm_start,
                            metric=metric)
        self.loss = get_loss(loss)
        self.max_iter = max_iter
        self.tol = tol
        self.batch_size = batch_size
        self.class_weight = class_weight
        self.random_state = create_random_state(seed=random_state)
        self._x = []
        self._z = []

    @abstractmethod
    def _initialize(self, *args, **kwargs):
        """
        Initialize the parameters of the NNRF.
        """
        raise NotImplementedError("No initialize function implemented")

    def fit(self, X, Y, *args, weights=None, **kwargs):
        """
        Train the model on the given data and labels.

        Parameters
        ----------
        X : array-like, shape=(n_samples, n_features)
            Training data.

        Y : array-like, shape=(n_samples,)
            Target labels as integers.

        weights : array-like, shape=(n_samples,), default=None
            Sample weights. If None, then samples are equally weighted.

        Returns
        -------
        self : Base
            Fitted estimator.
        """
        X, Y = check_XY(X=X, Y=Y)
        if self.n_classes_ is None : self.n_classes_ = len(set(decode(Y)))
        if self.n_features_ is None : self.n_features_ = X.shape[1]
        try : Y = one_hot(Y, cols=self.n_classes_)
        except : raise
        batch_size = calculate_batch(self.batch_size, len(Y))
        ds = BatchDataset(X, Y, weights, seed=self.random_state).shuffle().repeat().batch(batch_size)
        if not self.warm_start or not self._is_fitted():
            if self.verbose > 0 : print("Initializing model")
            self._initialize()
        if self.verbose > 0 : print("Training model for %d epochs" % self.max_iter,
                                "on %d samples in batches of %d." % \
                                (X.shape[0], batch_size))
        loss_prev, early_stop, e = np.inf, False, 0
        if self.verbose == 1 : epochs = trange(self.max_iter)
        else : epochs = range(self.max_iter)
        for e in epochs:
            batches = range(ds.n_batches)
            if self.verbose == 2 : batches = trange(ds.n_batches)
            elif self.verbose > 2 : print("Epoch %d" % e)
            for b in batches:
                X_batch, Y_batch, weights = ds.next()
                if len(X_batch) == 0:
                    if self.verbose > 0 : print("No more data to train. Ending training.")
                    early_stop = True
                    break
                Y_hat = self._forward(X_batch)
                loss = np.mean(np.sum(self.loss.loss(Y_hat, Y_batch), axis=1))
                metric = self.score(Y_batch, Y_hat=Y_hat, weights=weights)
                msg = 'loss: %.4f' % loss + ', ' + self.metric.name + ': %.4f' % metric
                if self.verbose == 1 : epochs.set_description(msg)
                elif self.verbose == 2 : batches.set_description(msg)
                elif self.verbose > 2 : print("Epoch %d, Batch %d completed." % (e+1, b+1), msg)
                if self.tol is not None and np.abs(loss - loss_prev) < self.tol:
                    early_stop = True
                    break
                self._backward(Y_hat, Y_batch, weights=weights)
                loss_prev = loss
            if early_stop : break
        self.fitted_ = True
        if self.verbose > 0 : print("Training complete.")
        return self

    def predict_proba(self, X, *args, **kwargs):
        """
        Predict class probabilities for each sample in `X`.

        Parameters
        ----------
        X : array-like, shape=(n_samples, n_features)
            Data to predict.

        Returns
        -------
        proba : array-like, shape=(n_samples, n_classes)
            Class probabilities of input data.
            The order of classes is in sorted ascending order.
        """
        if not self._is_fitted():
            raise RuntimeError("Model is not fitted")
        X = check_XY(X=X)
        if X.shape[1] != self.n_features_:
            raise ValueError("Model takes %d features as input" % self.n_features_,
                                "but data has %d features" % X.shape[1])
        if self.verbose > 0 : print("Predicting %d samples." % \
                                X.shape[0])
        return self._forward(X)

    def feature_importance(self, X, Y):
        """
        Calculate the feature importances by permuting
        each feature separately and measuring the
        increase in loss.

        Parameters
        ----------
        X : array-like, shape=(n_samples, n_features)
            Training data.

        Y : array-like, shape=(n_samples,)
            Target labels as integers.

        Returns
        -------
        importances : list, shape=(n_features,)
            List of feature importances by error increase,
            in order of features as they appear in the data.
            The larger the error increase, the more
            important the feature.
        """
        if not self._is_fitted():
            raise RuntimeError("Model is not fitted")
        X, Y = check_XY(X=X, Y=Y)
        try : Y = one_hot(Y, cols=self.n_classes_)
        except : raise
        if X.shape[1] != self.n_features_:
            raise ValueError("Model takes %d features as input" % self.n_features_,
                                "but data has %d features" % X.shape[1])
        if self.verbose > 0 : print("Calculating feature importances")
        loss = np.exp(self.loss.loss(self.predict_proba(X), Y))
        importances = []
        for f in range(X.shape[1]):
            X_ = copy(X)
            self.random_state.shuffle(X_[:,f])
            loss_ = self.loss.loss(self.predict_proba(X_), Y)
            importances.append(np.exp(loss_) / loss)
        return importances

    @abstractmethod
    def _forward(self, X, *args, **kwargs):
        """
        Conduct the forward propagation steps through the model.

        Parameters
        ----------
        X : array-like, shape=(n_samples, n_features)
            Data to predict.

        Returns
        -------
        Y_hat : array-like, shape=(n_samples, n_classes)
            Output.
        """
        raise NotImplementedError("No forward propagation implemented")

    def _backward(self, Y_hat, Y, *args, weights=None, **kwargs):
        """
        Conduct the backward propagation steps through the
        model.

        Parameters
        ----------
        Y_hat : array-like, shape=(n_samples, n_classes)
            Model output.

        Y : array-like, shape=(n_samples, n_classes)
            Target labels in one hot encoding.

        weights : array-like, shape=(n_samples,), default=None
            Sample weights. If None, then samples are equally weighted.
        """
        return