w4k2/stream-learn

View on GitHub
strlearn/streams/ARFFParser.py

Summary

Maintainability
C
7 hrs
Test Coverage
import numpy as np
from sklearn import preprocessing

ATYPES = ("nominal", "numeric")


class ARFFParser:
    """ Stream-aware parser of datasets in ARFF format.

    :type path: string
    :param path: Path to the ARFF file.
    :type chunk_size: integer, optional (default=200)
    :param chunk_size: The number of instances in each data chunk.
    :type n_chunks: integer, optional (default=250)
    :param n_chunks: The number of data chunks, that the stream is composed of.

    :Example:

    >>> import strlearn as sl
    >>> stream = sl.streams.ARFFParser("Agrawal.arff")
    >>> clf = sl.classifiers.AccumulatedSamplesClassifier()
    >>> evaluator = sl.evaluators.PrequentialEvaluator()
    >>> evaluator.process(clf, stream)
    >>> stream.reset()
    >>> print(evaluator.scores_)
    ...
    [[0.855      0.80815508 0.79478582 0.80815508 0.89679715]
    [0.795      0.75827674 0.7426779  0.75827674 0.84644195]
    [0.8        0.75313899 0.73559983 0.75313899 0.85507246]
    ...
    [0.885      0.86181169 0.85534199 0.86181169 0.91119691]
    [0.895      0.86935764 0.86452058 0.86935764 0.92134831]
    [0.87       0.85104088 0.84813907 0.85104088 0.9       ]]
    """

    def __init__(self, path, chunk_size=200, n_chunks=250):
        # Read file.
        self.name = path
        self._f = open(path, "r")
        self.chunk_size = chunk_size
        self.n_chunks = n_chunks

        # Prepare header storage
        self.types = []
        self.names = []
        self.lencs = {}

        self.chunk_id = 0
        self.starting_chunk = False
        # Analyze its header
        while True:
            line = self._f.readline()[:-1]
            pos = self._f.tell()
            if line == "@data":
                line = self._f.readline()
                if line not in ["\n", "\r\n"]:
                    self._f.seek(pos)
                break

            elements = line.split(" ")
            if elements[0] == "@attribute":
                if elements[1] == "class":
                    # Analyze classes
                    if elements[-1] == '':
                        elements.pop()
                    if len(elements) > 3:
                        _, _, class_names = line.split(" ", maxsplit=2)
                        class_names = class_names[1:-1].replace(" ", "").split(",")
                    else:
                        class_names = elements[2][1:-1].split(",")

                    self.le = preprocessing.LabelEncoder()
                    self.le.fit(np.array(class_names))
                    self.classes_ = self.le.transform(self.le.classes_)
                    self.n_classes = len(self.classes_)
                else:
                    self.names.append(elements[1])
                    if elements[2][0] == "{" and elements[2][1] != "'":
                        self.types.append("nominal")
                        le = preprocessing.LabelEncoder()
                        le.fit(np.array(elements[2][1:-1].split(",")))
                        self.lencs.update({len(self.names) - 1: le})
                    elif elements[2][0] == "{" and elements[2][1] == "'":
                        self.types.append("nominal")
                        le = preprocessing.LabelEncoder()
                        temporary = np.array(elements[2][1:-1].split(","))
                        temporary = np.array([element[1:-1] for element in temporary])
                        le.fit(temporary)
                        self.lencs.update({len(self.names) - 1: le})
                    elif elements[2] in ["numeric", "real"]:
                        self.types.append("numeric")
            elif elements[0] == "@relation":
                self.relation = " ".join(elements[1:])

        self.types = np.array(self.types)
        self.nominal_atts = np.where(self.types == "nominal")[0]
        self.numeric_atts = np.where(self.types == "numeric")[0]
        self.n_attributes = self.types.shape[0]
        self.is_dry_ = False

        # Read first line
        self.a_line = self._f.readline()

    def __del__(self):
        self._f.close()

    def __str__(self):
        return self.name

    def __next__(self):
        while not self.is_dry():
            yield self.get_chunk()

    def __iter__(self):
        return next(self)

    def is_dry(self):
        """
        Checking if we have reached the end of the stream.

        :returns: flag showing if the stream has ended
        :rtype: boolean
        """

        return (
            self.chunk_id + 1 >= self.n_chunks if hasattr(self, "chunk_id") else False
        )

    def get_chunk(self):
        """
        Generating a data chunk of a stream.

        Used by all evaluators but also accesible for custom evaluation.

        :returns: Generated samples and target values.
        :rtype: tuple {array-like, shape (n_samples, n_features), array-like, shape (n_samples, )}
        """
        if self.chunk_id == 0 and self.starting_chunk is False:
            self.previous_chunk = None
            self.chunk_id = -1
            self.starting_chunk = True
        else:
            self.previous_chunk = self.current_chunk

        size = self.chunk_size
        X, y = np.zeros((size, self.n_attributes)), []
        for i in range(size):
            if not self.a_line[-1] == "\n":
                self.is_dry_ = True
                line = self.a_line
            elif self.a_line == "\n":  # test arff files have two empty lines at the end
                self.is_dry_ = True
                continue
            else:
                line = self.a_line[:-1]
            elements = line.split(",")

            # Get class
            if elements[-1] == "":
                y.append(elements[-2].strip())
            else:
                y.append(elements[-1].strip())

            # Read attributes
            attributes = np.array(elements[:-1])
            attributes = np.array([att.strip() for att in attributes])
            # print(attributes)
            # exit()

            # Get nominal
            X[i, self.numeric_atts] = attributes[self.numeric_atts]
            X[i, self.nominal_atts] = [
                self.lencs[j].transform([attributes[j]])[0] for j in self.nominal_atts
            ]

            if self.is_dry_:
                break
            else:
                # Catch dry stream with length dividable by chunk size
                self.a_line = self._f.readline()

        y = self.le.transform(y)
        self.chunk_id += 1
        self.current_chunk = X[: i + 1, :], y[: i + 1]
        return self.current_chunk

    def reset(self):
        "Reset processed stream and close ARFF file."
        self.is_dry_ = False
        self.chunk_id = 0
        self._f.close()