w4k2/stream-learn

View on GitHub
strlearn/ensembles/UOB.py

Summary

Maintainability
F
3 days
Test Coverage
import numpy as np
from sklearn.base import clone
from ..ensembles.base import StreamingEnsemble

class UOB(StreamingEnsemble):
    """
    Undersampling-Based Online Bagging.
    """
    def __init__(self, base_estimator=None, n_estimators=5, time_decay_factor=0.9):
        """Initialization."""
        super().__init__(base_estimator, n_estimators)
        self.time_decay_factor = time_decay_factor

    def partial_fit(self, X, y, classes=None):
        super().partial_fit(X, y, classes)
        if not self.green_light:
            return self

        if len(self.ensemble_) == 0:
            self.ensemble_ = [
                clone(self.base_estimator) for i in range(self.n_estimators)
            ]

        # time decayed class sizes tracking
        if not hasattr(self, "last_instance_sizes"):
            self.current_tdcs_ = np.zeros((1, 2))
        else:
            self.current_tdcs_ = self.last_instance_sizes

        self.chunk_tdcs = np.ones((self.X_.shape[0], self.classes_.shape[0]))

        for iteration, label in enumerate(self.y_):
            if label == 0:
                self.current_tdcs_[0, 0] = (
                    self.current_tdcs_[0, 0] * self.time_decay_factor
                ) + (1 - self.time_decay_factor)
                self.current_tdcs_[0, 1] = (
                    self.current_tdcs_[0, 1] * self.time_decay_factor
                )
            else:
                self.current_tdcs_[0, 1] = (
                    self.current_tdcs_[0, 1] * self.time_decay_factor
                ) + (1 - self.time_decay_factor)
                self.current_tdcs_[0, 0] = (
                    self.current_tdcs_[0, 0] * self.time_decay_factor
                )

            self.chunk_tdcs[iteration] = self.current_tdcs_

        self.last_instance_sizes = self.current_tdcs_

        # improved UOB
        self.weights = []
        for instance, label in enumerate(self.y_):
            if (
                label == 1
                and self.chunk_tdcs[instance][1] > self.chunk_tdcs[instance][0]
            ):
                lmbda = self.chunk_tdcs[instance][0] / \
                    self.chunk_tdcs[instance][1]
                K = np.asarray(
                    [np.random.poisson(lmbda, 1)[0]
                     for i in range(self.n_estimators)]
                )
            elif (
                label == 0
                and self.chunk_tdcs[instance][0] > self.chunk_tdcs[instance][1]
            ):
                lmbda = self.chunk_tdcs[instance][1] / \
                    self.chunk_tdcs[instance][0]
                K = np.asarray(
                    [np.random.poisson(lmbda, 1)[0]
                     for i in range(self.n_estimators)]
                )
            else:
                lmbda = 1
                K = np.asarray(
                    [np.random.poisson(lmbda, 1)[0]
                     for i in range(self.n_estimators)]
                )
            self.weights.append(K)

        self.weights = np.asarray(self.weights).T

        for w, base_model in enumerate(self.ensemble_):
            base_model.partial_fit(
                self.X_, self.y_, self.classes_, sample_weight=self.weights[w]
            )

        return self