WenjieDu/TSDB

View on GitHub
tsdb/loading_funcs/ucr_uea_datasets.py

Summary

Maintainability
C
1 day
Test Coverage
"""
Scripts related to UCR&UAE datasets http://timeseriesclassification.com/index.php

Most of code comes from library tslearn https://github.com/tslearn-team/tslearn.

For more information please refer to:
https://github.com/WenjieDu/TSDB/tree/main/dataset_profiles/ucr_uea_datasets

"""

# Created by Wenjie Du <wenjay.du@gmail.com>
# License: BSD-3-Clause

import os
import warnings

import numpy as np
from sklearn.utils.estimator_checks import _NotAnArray as NotAnArray


def load_ucr_uea_dataset(local_path, dataset_name):
    try:
        # if both TXT and ARFF files are provided, the TXT versions are
        # used
        # both training and test data must be available in the same format
        if _has_files(local_path, dataset_name, ext="arff"):
            X_train, y_train = _load_arff_uea(
                os.path.join(local_path, dataset_name + "_TRAIN.arff")
            )
            X_test, y_test = _load_arff_uea(
                os.path.join(local_path, dataset_name + "_TEST.arff")
            )
        elif _has_files(local_path, dataset_name, ext="txt"):
            X_train, y_train = _load_txt_uea(
                os.path.join(local_path, dataset_name + "_TRAIN.txt")
            )
            X_test, y_test = _load_txt_uea(
                os.path.join(local_path, dataset_name + "_TEST.txt")
            )
        else:
            warnings.warn(
                'dataset "%s" is not provided in either TXT '
                "or ARFF format and thus could not be loaded" % dataset_name,
                category=RuntimeWarning,
                stacklevel=2,
            )
            return None

        data = {
            "X_train": X_train,
            "y_train": y_train,
            "X_test": X_test,
            "y_test": y_test,
        }

        return data

    except Exception as exception:
        warnings.warn(
            'dataset "%s" could be downloaded but not '
            "parsed: %s" % (dataset_name, str(exception)),
            category=RuntimeWarning,
            stacklevel=2,
        )


def _has_files(data_dir, dataset_name, ext):
    """Determines whether some downloaded and unzipped dataset provides
    both training and test data in the given format.

    Parameters
    ----------
    dataset_name : str
        the name of the dataset
    ext : str or None
        the file extension without a dot, e.g `"txt"` or `"arff"`;
        if set to None (the default), `True` will be returned if either TXT
        or ARFF files are present

    Returns
    -------
    bool
        if there are both training and test files with the specified
        file extension
    """
    basename = os.path.join(data_dir, dataset_name)
    return os.path.exists(basename + "_TRAIN.%s" % ext) and os.path.exists(
        basename + "_TEST.%s" % ext
    )


def ts_size(ts):
    """Returns actual time series size.

    Final timesteps that have `NaN` values for all dimensions will be removed
    from the count. Infinity and negative infinity ar considered valid time
    series values.

    Parameters
    ----------
    ts : array-like
        A time series.

    Returns
    -------
    int
        Actual size of the time series.

    Examples
    --------
    >>> ts_size([1, 2, 3, np.nan])
    3
    >>> ts_size([1, np.nan])
    1
    >>> ts_size([np.nan])
    0
    >>> ts_size([[1, 2],
    ...          [2, 3],
    ...          [3, 4],
    ...          [np.nan, 2],
    ...          [np.nan, np.nan]])
    4
    >>> ts_size([np.nan, 3, np.inf, np.nan])
    3
    """
    ts_ = to_time_series(ts)
    sz = ts_.shape[0]
    while sz > 0 and np.all(np.isnan(ts_[sz - 1])):
        sz -= 1
    return sz


def to_time_series(ts, remove_nans=False):
    """Transforms a time series so that it fits the format used in ``tslearn``
    models.

    Parameters
    ----------
    ts : array-like
        The time series to be transformed.
    remove_nans : bool (default: False)
        Whether trailing NaNs at the end of the time series should be removed
        or not

    Returns
    -------
    np.ndarray of shape (sz, d)
        The transformed time series. This is always guaraneteed to be a new
        time series and never just a view into the old one.

    Examples
    --------
    >>> to_time_series([1, 2])
    array([[1.],
           [2.]])
    >>> to_time_series([1, 2, np.nan])
    array([[ 1.],
           [ 2.],
           [nan]])
    >>> to_time_series([1, 2, np.nan], remove_nans=True)
    array([[1.],
           [2.]])

    See Also
    --------
    to_time_series_dataset : Transforms a dataset of time series
    """
    ts_out = np.array(ts, copy=True)
    if ts_out.ndim <= 1:
        ts_out = ts_out.reshape((-1, 1))
    if ts_out.dtype != float:
        ts_out = ts_out.astype(float)
    if remove_nans:
        ts_out = ts_out[: ts_size(ts_out)]
    return ts_out


def to_time_series_dataset(dataset, dtype=float):
    """Transforms a time series dataset so that it fits the format used in
    ``tslearn`` models.

    Parameters
    ----------
    dataset : array-like
        The dataset of time series to be transformed. A single time series will
        be automatically wrapped into a dataset with a single entry.
    dtype : data type (default: float)
        Data type for the returned dataset.

    Returns
    -------
    np.ndarray of shape (n_ts, sz, d)
        The transformed dataset of time series.

    Examples
    --------
    >>> to_time_series_dataset([[1, 2]])
    array([[[1.],
            [2.]]])
    >>> to_time_series_dataset([1, 2])
    array([[[1.],
            [2.]]])
    >>> to_time_series_dataset([[1, 2], [1, 4, 3]])
    array([[[ 1.],
            [ 2.],
            [nan]],
    <BLANKLINE>
           [[ 1.],
            [ 4.],
            [ 3.]]])
    >>> to_time_series_dataset([]).shape
    (0, 0, 0)

    See Also
    --------
    to_time_series : Transforms a single time series
    """
    try:
        import pandas as pd

        if isinstance(dataset, pd.DataFrame):
            return to_time_series_dataset(np.array(dataset))
    except ImportError:
        pass
    if isinstance(dataset, NotAnArray):  # Patch to pass sklearn tests
        return to_time_series_dataset(np.array(dataset))
    if len(dataset) == 0:
        return np.zeros((0, 0, 0))
    if np.array(dataset[0]).ndim == 0:
        dataset = [dataset]
    n_ts = len(dataset)
    max_sz = max([ts_size(to_time_series(ts, remove_nans=True)) for ts in dataset])
    d = to_time_series(dataset[0]).shape[1]
    dataset_out = np.zeros((n_ts, max_sz, d), dtype=dtype) + np.nan
    for i in range(n_ts):
        ts = to_time_series(dataset[i], remove_nans=True)
        dataset_out[i, : ts.shape[0]] = ts
    return dataset_out.astype(dtype)


def _load_arff_uea(
    full_file_path_and_name,
    replace_missing_vals_with="NaN",
):
    """Load data from a classification/regression WEKA arff file to a 3D np array.

    Parameters
    ----------
    full_file_path_and_name: str
        The full pathname of the .ts file to read.
    replace_missing_vals_with: str
       The value that missing values in the text file should be replaced
       with prior to parsing.

    Returns
    -------
    data: np.ndarray
        time series data, np.ndarray (n_cases, n_channels, n_timepoints)
    y : np.ndarray of string or int
        target variable
    """
    instance_list = []
    class_val_list = []
    data_started = False
    is_multi_variate = False
    is_first_case = True
    n_cases = 0
    n_channels = 1
    with open(full_file_path_and_name, encoding="utf-8") as f:
        for line in f:
            if line.strip():
                if (
                    is_multi_variate is False
                    and "@attribute" in line.lower()
                    and "relational" in line.lower()
                ):
                    is_multi_variate = True

                if "@data" in line.lower():
                    data_started = True
                    continue
                # if the 'data tag has been found, the header information
                # has been cleared and now data can be loaded
                if data_started:
                    line = line.replace("?", replace_missing_vals_with)

                    if is_multi_variate:
                        line, class_val = line.split("',")
                        class_val_list.append(class_val.strip())
                        channels = line.split("\\n")
                        channels[0] = channels[0].replace("'", "")
                        if is_first_case:
                            n_channels = len(channels)
                            n_timepoints = len(channels[0].split(","))
                            is_first_case = False
                        elif len(channels) != n_channels:
                            raise ValueError(
                                f" Number of channels not equal in "
                                f"dataset, first case had {n_channels} channel "
                                f"but case number {n_cases + 1} has "
                                f"{len(channels)}"
                            )
                        inst = np.zeros(shape=(n_channels, n_timepoints))
                        for c in range(len(channels)):
                            split = channels[c].split(",")
                            inst[c] = np.array([float(i) for i in split])
                    else:
                        line_parts = line.split(",")
                        if is_first_case:
                            is_first_case = False
                            n_timepoints = len(line_parts) - 1
                        class_val_list.append(line_parts[-1].strip())
                        split = line_parts[: len(line_parts) - 1]
                        inst = np.zeros(shape=(n_channels, n_timepoints))
                        inst[0] = np.array([float(i) for i in split])
                    instance_list.append(inst)
    return np.asarray(instance_list), np.asarray(class_val_list)


def _load_txt_uea(dataset_path):
    """Load arff file for uni/multi variate dataset

    Parameters
    ----------
    dataset_path: string of dataset_path
        Path to the TXT file to be read

    Returns
    -------
    x: np array of shape (n_timeseries, n_timestamps, n_features)
        Time series dataset
    y: np array of shape (n_timeseries, )
        Vector of targets

    Raises
    ------
    Exception: on any failure, e.g. if the given file does not exist or is
               corrupted
    """
    data = np.loadtxt(dataset_path)
    X = to_time_series_dataset(data[:, 1:])
    y = data[:, 0].astype(int)
    return X, y