w4k2/stream-learn

View on GitHub
strlearn/streams/SemiSyntheticStreamGenerator.py

Summary

Maintainability
D
1 day
Test Coverage
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d

class SemiSyntheticStreamGenerator:
    """ Semi-Synthetic Data streams generator for drifting data streams.

    A generator that allows preparing a replicable classification dataset based on real-world input data. The generator uses one-dimensional interpolation to generate the drifting projections, based on which the final data stream is generated. 

    :param X: Static dataset features.
    :param y: Static dataset labels.
    :param n_chunks: The number of data chunks, that the stream is composed of.
    :param chunk_size: The number of instances in each data chunk.
    :param random_state: The seed used by the random number generator.
    :param n_drifts: The number of concept changes in the data stream.
    :param n_features: The number of features in output stream.
    :param interpolation: Interpolation type.
    :param stabilize_factor: The factor describing the stability of a concept.
    :param binarize: Flag describing if the data should be binarized.
    :type X: array-like, shape (n_samples, n_features)
    :type y: array-like, shape (n_samples, )
    :type n_chunks: integer, optional (default=200)
    :type chunk_size: integer, optional (default=250)
    :type random_state: integer, optional (default=None)
    :type n_drifts: integer, optional (default=2)
    :type n_features: integer, optional (default=10)
    :type interpolation: string, optional (default='nearest')
    :type stabilize_factor: float, optional (default=0.2)
    :type binarize: boolean, optional (default=True)

    :Example:
    >>> import strlearn as sl
    >>> from sklearn.datasets import load_breast_cancer
    >>> from sklearn.naive_bayes import GaussianNB
    >>> X, y =  load_breast_cancer(return_X_y=True)
    >>> stream = sl.streams.SemiSyntheticStreamGenerator(X, y, n_drifts=4, interpolation='cubic')
    >>> clf = GaussianNB()
    >>> evaluator = sl.evaluators.TestThenTrain()
    >>> evaluator.process(stream, clf)
    >>> print(stream._get_drifts())
    [ 14  48  89 155]
    """

    def __init__(
        self,
        X, 
        y,
        n_chunks=200,
        chunk_size=250,
        random_state=None,
        n_drifts=2,
        n_features=10,
        interpolation='nearest',
        stabilize_factor=0.2,
        binarize=True
    ):
        self.n_chunks = n_chunks
        self.chunk_size = chunk_size
        self.random_state = random_state if random_state is not None else np.random.randint(10000)
        self.n_drifts = n_drifts
        self.n_samples = self.n_chunks * self.chunk_size
        self.n_features = n_features
        self.interpolation = interpolation
        self.stabilize_factor = stabilize_factor
        self.binarize = binarize

        self.X_base = np.copy(X)
        self.y_base = np.copy(y)

        self.X = np.copy(X)
        self.y = np.copy(y)

        self.drift_basepoints = None
    
    def _get_drifts(self):
        if self.drift_basepoints is None:
            return []

        return (np.array([(int(self.drift_basepoints[bp_id-1]+(self.drift_basepoints[bp_id]-self.drift_basepoints[bp_id-1])/2)) 
                    for bp_id in range(1,len(self.drift_basepoints))])/self.chunk_size).astype(int)
    
    def _get_distances(self):
        if self.drift_basepoints is None:
            return []

        return [(self.drift_basepoints[bp_id]-self.drift_basepoints[bp_id-1]) 
                    for bp_id in range(1,len(self.drift_basepoints))]

    def _make_stream(self):
        np.random.seed(self.random_state)
        
        # Optionally binarize the data
        if self.binarize:
            self.y[self.y!=0] = 1

        self.classes_ = np.unique(self.y)
        class_indexes =[]

        # Perform random resampling
        for c_id, c in enumerate(self.classes_):
            ir = len(np.argwhere(self.y==c))/len(self.y)
            if c_id == len(self.classes_)-1:
                samples = self.n_samples - len(np.concatenate(class_indexes))
            else:
                samples = int(np.rint(ir*self.n_samples))
            indexes = np.random.choice(np.argwhere(self.y==c).flatten(), samples)
            class_indexes.append(indexes)

        X_arrs = [self.X[ind] for ind in class_indexes]
        y_arrs = [self.y[ind] for ind in class_indexes]

        self.X = np.concatenate((X_arrs))
        self.y = np.concatenate((y_arrs))

        # Shuffle the data
        p = np.random.permutation(len(self.y))

        self.X = self.X[p]
        self.y = self.y[p]

        # Generate basepoints
        self.drift_basepoints = np.linspace(0,self.n_samples,self.n_drifts+1)
        self.drift_basepoints[0] = 0
        self.drift_basepoints[-1] = self.n_samples

        # Select best projections
        n_concept_features = self.X.shape[1]
        base_projections = np.random.normal(size=(self.n_drifts+1,
                                              n_concept_features,
                                              self.n_features))


        # Add auxiliary points     
        distances = self._get_distances()

        _drift_basepoints = []
        _base_projections = []
        for p_id, p in enumerate(self.drift_basepoints):
            sep_neg = int(distances[p_id-1]*self.stabilize_factor)
            try:
                sep_pos = int(distances[p_id]*self.stabilize_factor)
            except:
                sep_pos = 0

            _drift_basepoints.append(p - sep_neg)
            _drift_basepoints.append(p)
            _drift_basepoints.append(p + sep_pos)
            
            [_base_projections.append(base_projections[p_id]) for i in range(3)]

        # Remove first and last - out of range
        drift_basepoints_aux = np.array(_drift_basepoints)[1:-1]
        base_projections_aux = np.array(_base_projections)[1:-1]

        # Generate stream
        continous_projections = np.zeros((self.n_samples, n_concept_features, self.n_features))
        stream_basepoints = np.linspace(0, self.n_samples-1, self.n_samples).astype(int)

        for d_s in range(self.n_features): 
            for d_c in range(n_concept_features):
                original_values = base_projections_aux[:, d_c, d_s]
                f = interp1d(drift_basepoints_aux, original_values, kind=self.interpolation)
                continous_projections[:, d_c, d_s] = f(stream_basepoints)
                
        X_s = np.sum(self.X[:, :, np.newaxis] * continous_projections, axis=1)

        return X_s, np.copy(self.y)


    def is_dry(self):
        return (
            self.chunk_id + 1 >= self.n_chunks if hasattr(self, "chunk_id") else False
        )

    def reset(self):
        self.previous_chunk = None
        self.chunk_id = -1

    def get_chunk(self):
        """
        Generating a data chunk of a stream.

        Used by all evaluators but also accesible for custom evaluation.

        :returns: Generated samples and target values.
        :rtype: tuple {array-like, shape (n_samples, n_features), array-like, shape (n_samples, )}
        """
        if hasattr(self, "generated_X"):
            self.previous_chunk = self.current_chunk
        else:
            self.generated_X, self.generated_y = self._make_stream()
            self.reset()

        self.chunk_id += 1

        if self.chunk_id < self.n_chunks:
            start, end = (
                self.chunk_size * self.chunk_id,
                self.chunk_size * self.chunk_id + self.chunk_size,
            )

            self.current_chunk = (self.generated_X[start:end], self.generated_y[start:end])
            return self.current_chunk
        else:
            return None
        
    def __next__(self):
        while not self.is_dry():
            yield self.get_chunk()

    def __iter__(self):
        return next(self)

    def save_to_arff(self, filepath):
        """
        Save generated stream to the ARFF format file.

        :param filepath: Path to the file where data will be saved in ARFF format.
        :type filepath: string
        """
        X_array = []
        y_array = []

        for i in range(self.n_chunks):
            X, y = self.get_chunk()
            X_array.extend(X)
            y_array.extend(y)

        X_array = np.array(X_array)
        y_array = np.array(y_array)
        classes = np.unique(y_array)
        data = np.column_stack((X_array, y_array))

        header = "@relation %s %s\n\n" % (
            (filepath.split("/")[-1]).split(".")[0],
            str(self),
        )

        for feature in range(self.n_features):
            header += "@attribute feature" + str(feature + 1) + " numeric \n"

        header += "@attribute class {%s} \n\n" % ",".join(map(str, classes))
        header += "@data\n"

        with open(filepath, "w") as file:
            file.write(str(header))
            np.savetxt(file, data, fmt="%.20g", delimiter=",")
            file.write("\n")

        self.reset()

    def save_to_npy(self, filepath):
        """
        Save generated stream to the numpy format file.

        :param filepath: Path to the file where data will be saved in numpy format.
        :type filepath: string
        """
        X, y = self._make_stream()
        ds = np.concatenate([X, y[:, np.newaxis]], axis=1)
        np.save(filepath, ds)


    def save_to_csv(self, filepath):
        """
        Save generated stream to the csv format file.

        :param filepath: Path to the file where data will be saved in csv format.
        :type filepath: string
        """
        X, y = self._make_classification()

        ds = np.concatenate([X, y[:, np.newaxis]], axis=1)

        pdds = pd.DataFrame(ds)
        pdds.infer_objects()
        pdds.iloc[: , -1] = pdds.iloc[: , -1].astype(int)
        pdds.to_csv(filepath, header=None,index=None)