w4k2/stream-learn

View on GitHub
strlearn/detectors/DDM.py

Summary

Maintainability
A
2 hrs
Test Coverage
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin

class DDM(BaseEstimator, ClassifierMixin):
    def __init__(self, warning_lvl=2, drift_lvl=3, skip=30):
        self.drift_lvl = drift_lvl
        self.skip = skip

        self.p_min = np.inf
        self.s_min = np.inf
        self.p_i = 1
        self.s_i = 0
        self.cnt = 1

        self.drift = []
        self._is_drift = False


    def feed(self, X, real, pred):

        if len(pred) != len(real):
            self.drift.append(0)
            self._is_drift = False

            return self

        chunk_p_i = []
        chunk_s_i = []
        chunk_p_mins = []
        chunk_s_mins = []

        err = np.abs(real-pred)
        for e in err:
            self.p_i += (e - self.p_i)/self.cnt
            self.s_i = np.sqrt(self.p_i * (1- self.p_i)/self.cnt)
            self.cnt+=1

            chunk_p_i.append(self.p_i)
            chunk_s_i.append(self.s_i)

            if self.p_i + self.s_i < self.p_min + self.s_min:
                self.p_min = self.p_i
                self.s_min = self.s_i

            chunk_p_mins.append(self.p_min)
            chunk_s_mins.append(self.s_min)

            if self.cnt > self.skip and self.p_i + self.s_i > self.p_min + self.drift_lvl*self.s_min:
                # reset
                self.p_min = np.inf
                self.s_min = np.inf
                self.p_i = 1
                self.s_i = 0
                self.cnt = 1
                self._is_drift = True
                break
            else:
                self._is_drift = False

        
        if self._is_drift:
            self.drift.append(2)
        else:
            self.drift.append(0)

        return self