w4k2/stream-learn

View on GitHub
strlearn/ensembles/CDS.py

Summary

Maintainability
C
1 day
Test Coverage
from sklearn.base import clone
import numpy as np
from imblearn.over_sampling import SMOTE
from sklearn.metrics import accuracy_score
from ..ensembles.base import StreamingEnsemble

class CDS(StreamingEnsemble):
    """
    CDS

    Ditzler, Gregory, and Robi Polikar. "Incremental learning of concept drift from streaming imbalanced data." IEEE Transactions on Knowledge and Data Engineering 25.10 (2013): 2283-2301.
    """

    def __init__(self,
                 base_estimator=None,
                 n_estimators=10,
                 a=2,
                 b=2):
        super().__init__(base_estimator, n_estimators, weighted=True)
        self.a = a
        self.b = b

    def partial_fit(self, X, y, classes=None):
        super().partial_fit(X, y, classes)
        if not self.green_light:
            return self

        # Find minority and majority names
        if not hasattr(self, "minority_name") or not hasattr(self, "majority_name"):
            self.minority_name, self.majority_name = self.minority_majority_name(y)

        self.n_instances = len(y)

        if self.ensemble_:
            y_pred = self.predict(X)

            E = (1 - accuracy_score(y, y_pred))

            eq = np.equal(y, y_pred)

            w = np.zeros(eq.shape)
            w[eq == True] = E/float(self.n_instances)
            w[eq == False] = 1/float(self.n_instances)

            w_sum = np.sum(w)

            D = w/w_sum

            res_X, res_y = self._resample(X, y)

            new_classifier = clone(self.base_estimator).fit(res_X, res_y)
            self.ensemble_.append(new_classifier)

            beta = []
            epsilon_sum_array = []

            for j in range(len(self.ensemble_)):
                y_pred = self.ensemble_[j].predict(X)

                eq_2 = np.not_equal(y, y_pred).astype(int)

                epsilon_sum = np.sum(eq_2*D)
                epsilon_sum_array.append(epsilon_sum)

                if epsilon_sum > 0.5:
                    if j is len(self.ensemble_) - 1:
                        self.ensemble_[j] = clone(self.base_estimator).fit(res_X, res_y)
                    else:
                        epsilon_sum = 0.5

            epsilon_sum_array = np.array(epsilon_sum_array)
            beta = epsilon_sum_array / (1 - epsilon_sum_array)

            sigma = []
            a = self.a
            b = self.b
            t = len(self.ensemble_)
            k = np.array(range(t))

            sigma = 1/(1 + np.exp(-a*(t-k-b)))

            sigma_mean = []
            for k in range(t):
                sigma_sum = np.sum(sigma[0:t-k])
                sigma_mean.append(sigma[k]/sigma_sum)

            beta_mean = []
            for k in range(t):
                beta_sum = np.sum(sigma_mean[0:t-k]*beta[0:t-k])
                beta_mean.append(beta_sum)

            self.weights_ = []
            for b in beta_mean:
                self.weights_.append(np.log(1/b))

            if t >= self.n_estimators:
                ind = np.argmax(beta_mean)
                del self.ensemble_[ind]
                del self.weights_[ind]

        else:
            res_X, res_y = self._resample(X, y)

            new_classifier = clone(self.base_estimator).fit(res_X, res_y)
            self.ensemble_.append(new_classifier)
            self.weights_ = [1]

    def _resample(self, X, y):
        minority, majority = self.minority_majority_split(X, y,
                                                    self.minority_name,
                                                    self.majority_name)
        if len(minority) > 6:
            res_X, res_y = SMOTE().fit_resample(X, y)
        else:
            res_X, res_y = SMOTE(k_neighbors=len(minority)-1).fit_resample(X, y)
        return res_X, res_y