src/triage/component/timechop/timechop.py

Summary

Maintainability
A
2 hrs
Test Coverage
import itertools

import verboselogs, logging
logger = verboselogs.VerboseLogger(__name__)

from triage.util.conf import convert_str_to_relativedelta, dt_from_str
from triage.util.structs import AsOfTimeList

from . import utils

# Throughout the code here, we're going to follow an example
# based around the following config:
# {
#
#   feature_start_time: '1995-01-01',
#   feature_end_time: '2017-07-01',
#
#   label_start_time: '2012-01-01',
#   label_end_time: '2017-07-01',
#
#   model_update_frequency: '1year',
#
#   training_label_timespans: ['6month'],
#   test_label_timespans: ['6month'],
#
#   max_training_histories: ['2year'],
#   test_durations: ['3month'],
#
#   training_as_of_date_frequencies='1day',
#   test_as_of_date_frequencies='1month'
#
# }


class Timechop:
    def __init__(
        self,
        feature_start_time,
        feature_end_time,
        label_start_time,
        label_end_time,
        model_update_frequency,
        training_as_of_date_frequencies,
        max_training_histories,
        training_label_timespans,
        test_as_of_date_frequencies,
        test_durations,
        test_label_timespans,
    ):

        '''
        Date strings should follow the format `YYYY-MM-DD`. Date intervals
        should be strings of the Postgres interval input format.

        This class is often used within the Triage experiment pipeline, and
        initialized using parameters from a Triage [experiment config](../../../experiments/experiment-config/#time-splitting)

        Arguments:
            feature_start_time (str): Earliest date included in any feature
            feature_end_time (str): Day after last feature date (all data
                included in features are before this date)
            label_start_time (str): Earliest date for which labels are available
            label_end_time (str): Day AFTER last label date (all dates in any
                model are before this date)
            model_update_frequency (str): how frequently to retrain models
            training_as_of_date_frequencies (str): time between rows for same
                entity in train matrix
            max_training_histories (str): Interval specifying how much history
                for each entity to train on
            training_label_timespans (str): how much time is included in a label
                in the train matrix
            test_as_of_date_frequencies (str): time between rows for same entity
                in test matrix
            test_durations (str): How long into the future to make predictions
                for each entity. Controls the length of time included in a test
                matrix
            test_label_timespans (str): How much time is included in a label
                in the test matrix.
        '''
        self.feature_start_time = dt_from_str(
            feature_start_time
        )
        self.feature_end_time = dt_from_str(
            feature_end_time
        )
        if self.feature_start_time > self.feature_end_time:
            raise ValueError("Feature start time after feature end time.")

        self.label_start_time = dt_from_str(
            label_start_time
        )
        self.label_end_time = dt_from_str(
            label_end_time
        )
        if self.label_start_time > self.label_end_time:
            raise ValueError("Label start time after label end time.")

        self.model_update_frequency = convert_str_to_relativedelta(
            model_update_frequency
        )

        self.training_as_of_date_frequencies = utils.convert_to_list(
            training_as_of_date_frequencies
        )

        self.test_as_of_date_frequencies = utils.convert_to_list(
            test_as_of_date_frequencies
        )

        self.max_training_histories = utils.convert_to_list(max_training_histories)

        self.test_durations = utils.convert_to_list(test_durations)

        self.training_label_timespans = utils.convert_to_list(training_label_timespans)

        self.test_label_timespans = utils.convert_to_list(test_label_timespans)

    def chop_time(self):
        """ Given the attributes of the object, define all train/test splits
        for all combinations of the temporal parameters.

        return:
            list: a list of dictionaries defining train/test splits
        """
        matrix_set_definitions = []
        # in our example, we just have one value for each of these: 6month, 6month, and 3month
        for (
            training_label_timespan,
            test_label_timespan,
            test_duration,
        ) in itertools.product(
            self.training_label_timespans,
            self.test_label_timespans,
            self.test_durations,
        ):
            # calculating the train-test split times starts from the end and walks backwards
            # e.g., train_test_split_times for our example with a 1 year model_update_frequency
            # will be every Oct. 1 from 2012 to 2016 (see comments in the method for details
            # on the calculation):
            # train_test_split_times = [2012-10-01, 2013-10-01, 2014-10-01, 2015-10-01, 2016-10-01]
            logger.spam(
                f"Calculating train/test split times for training prediction span {training_label_timespan}, "
                f"test prediction span {test_label_timespan}, test span {test_duration}"
            )
            train_test_split_times = self.calculate_train_test_split_times(
                training_label_timespan=convert_str_to_relativedelta(
                    training_label_timespan
                ),
                test_label_timespan=convert_str_to_relativedelta(test_label_timespan),
                test_duration=test_duration,
            )
            logger.spam(f"Train/test split times: {train_test_split_times}")

            # handle each training_as_of_date_frequency and max_training_history separately
            # to create matrices for each train_test_split_time.
            # in our example, we only have one value for each: 1day and 2year
            for (
                training_as_of_date_frequency,
                max_training_history,
            ) in itertools.product(
                self.training_as_of_date_frequencies, self.max_training_histories
            ):
                logger.spam(
                    f"Generating matrix definitions for training_as_of_date_frequency {training_as_of_date_frequency}, "
                    f"max_training_history {max_training_history}"
                )
                for train_test_split_time in train_test_split_times:
                    logger.spam(f"Generating matrix definitions for split {train_test_split_time}")
                    matrix_set_definitions.append(
                        self.generate_matrix_definitions(
                            train_test_split_time=train_test_split_time,
                            training_as_of_date_frequency=training_as_of_date_frequency,
                            max_training_history=max_training_history,
                            test_duration=test_duration,
                            training_label_timespan=training_label_timespan,
                            test_label_timespan=test_label_timespan,
                        )
                    )
        return matrix_set_definitions

    def calculate_train_test_split_times(
        self, training_label_timespan, test_label_timespan, test_duration
    ):
        """ Calculate the split times between train and test matrices. All
        label spans in train matrices will end at this time, and this will be
        the first as of time in the respective test matrix.

        Arguments:
            training_label_timespan (dateutil.relativedelta.relativedelta): how much
                time is included in training labels
            test_label_timespan (dateutil.relativedelta.relativedelta): how much time is included in test labels
            test_duration (str): for how long after the end of a training matrix are
                test predictions made

        Returns:
            list: all split times for the temporal parameters
        Raises:
            ValueError: if there are no valid split times in the temporal
            config
        """

        # we always want to be sure we're using the most recent data, so for the splits,
        # we start from the very end of time for which we have labels and walk backwards,
        # ensuring we leave enough of a buffer for the test_label_timespan to get a full
        # set of labels for our last testing as_of_date
        #
        # in our example, last_test_label_time = 2017-07-01 - 6month = 2017-01-01
        last_test_label_time = self.label_end_time - test_label_timespan

        # final label must be able to have feature data associated with it
        if last_test_label_time > self.feature_end_time:
            last_test_label_time = self.feature_end_time
            raise ValueError(
                "Final test label date cannot be after end of feature time."
            )
        logger.spam(f"Final label as of date: {last_test_label_time}")

        # all split times have to allow at least one training label before them
        # e.g., earliest_possible_split_time = max(1995-01-01, 2012-01-01) + 6month = 2012-01-01
        earliest_possible_split_time = training_label_timespan + max(
            self.feature_start_time, self.label_start_time
        )
        logger.spam(f"Earliest possible train/test split time: {earliest_possible_split_time}")

        # last split is the first as of time in the final test matrix
        # that is, starting from the label_end_time, we've walked back by the test_label_timespan
        # (above) to allow a buffer for labels and now we walk back further by the test_duration to
        # ensure we have a full set of test data in the latest test matrix.
        #
        # e.g., last_split_time = 2017-01-01 - 3month = 2016-10-01
        test_delta = convert_str_to_relativedelta(test_duration)
        last_split_time = last_test_label_time - test_delta
        logger.spam(f"Final split time: {last_split_time}")
        if last_split_time < earliest_possible_split_time:
            raise ValueError("No valid train/test split times in temporal config.")

        train_test_split_times = []
        train_test_split_time = last_split_time

        # finally, starting from our last_split_time, simply step backwards by the
        # model_update_frequency until we hit the earliest allowable time to
        # yield the set of train_test_split_times
        #
        # e.g., train_test_split_times for our example with a 1 year model_update_frequency
        # will be every Oct. 1 from 2012 to 2016:
        # train_test_split_times = [2012-10-01, 2013-10-01, 2014-10-01, 2015-10-01, 2016-10-01]
        while train_test_split_time >= earliest_possible_split_time:
            train_test_split_times.insert(0, train_test_split_time)
            train_test_split_time -= self.model_update_frequency

        return train_test_split_times

    # matrix_end_time is now matrix_end_time - label_window
    def calculate_as_of_times(
        self, as_of_start_limit, as_of_end_limit, data_frequency, forward=False
    ):
        """ Given a start and stop time, a frequncy, and a direction, calculate the
        as of times for a matrix.

        Arguments:
            as_of_start_limit (datetime.datetime): the earliest possible as of time for a matrix
            as_of_end_limit (datetime.datetime): the last possible as of time for the matrix
            data_frequency (str): The time interval that should pass between rows
                of a single entity. Of the format `'date unit'`. For example,
                `'1 month'`.
            forward (boolean): whether to generate times forward from the start time
                            (True) or backward from the end time (False)

        return:
            list: list of as of times for the matrix
        """
        logger.spam(f"Calculating as_of_times from {as_of_start_limit} to {as_of_end_limit} using example frequency {data_frequency}")

        as_of_times = []

        # in our example, this will apply to the test matrix with parameters
        #   as_of_start_limit = 2016-10-01, as_of_end_limit = 2017-01-01,
        #   data_frequency = 1month, forward=True
        # so, we'll start at 2016-10-01 and append this to the list of
        # as_of_times, then step forward one month at a time until we hit (but
        # do not include) 2017-01-01, yielding three values:
        #   [2016-10-01, 2016-11-01, 2016-12-01]
        if forward:
            as_of_time = as_of_start_limit
            # essentially a do-while loop for test matrices since
            # identical start and end times should include the first
            # date (e.g., ['2017-01-01', '2017-01-01') should give
            # preference to the inclusive side)
            as_of_times.append(as_of_time)
            as_of_time += data_frequency
            while as_of_time < as_of_end_limit:
                as_of_times.append(as_of_time)
                as_of_time += data_frequency

        # in our example, this will apply to the training matrix with parameters
        #   as_of_start_limit = 2014-04-01, as_of_end_limit = 2016-04-01,
        #   data_frequency = 1day, forward=False
        # so, we'll start from 2016-04-01 and step back by one day at a time
        # appending the results to the list of as_of_times until we hit 2014-04-01
        # (which will also be included)
        else:
            as_of_time = as_of_end_limit
            while as_of_time >= as_of_start_limit:
                as_of_times.insert(0, as_of_time)
                as_of_time -= data_frequency

        return as_of_times

    def generate_matrix_definitions(
        self,
        train_test_split_time,
        training_as_of_date_frequency,
        max_training_history,
        test_duration,
        training_label_timespan,
        test_label_timespan,
    ):
        """ Given a split time and parameters for train and test matrices,
        generate as of times and metadata for the matrices in the split.

        Arguments:
            train_test_split_time (datetime.datetime): the limit of the last label in the matrix
            training_as_of_date_frequency (str): how much time between rows for an entity
                                            in a training matrix
            max_training_history (str): how far back from split do train
                                        as_of_times go
            test_duration (str): how far forward from split do test as_of_times go
            training_label_timespan (str): how much time covered by train labels
            test_label_timespan (str): how much time is covered by test labels

        returns:
            dict: dictionary defining the train and test matrices for a split
        """

        # continuing our example, let's consider the case when this is called for the last
        # train_test_split_time, so the parameters here are:
        #   train_test_split_time = 2016-10-01
        #   training_as_of_date_frequency = 1day
        #   max_training_history = 2year
        #   test_duration = 3month
        #   training_label_timespan = 6month
        #   test_label_timespan = 6month

        # for the example, the train matrix will contain as_of_dates for every day from
        # 2014-04-01 through 2016-04-01, including _both_ endpoints, providing a 6 month
        # buffer between the last as_of_time and the train-test split time for the last
        # set of labels (see comments in the method for details)
        train_matrix_definition = self.define_train_matrix(
            train_test_split_time=train_test_split_time,
            training_label_timespan=training_label_timespan,
            max_training_history=max_training_history,
            training_as_of_date_frequency=training_as_of_date_frequency,
        )

        # for the example, the test matrix will contain three as_of_dates:
        #   [2016-10-01, 2016-11-01, 2016-12-01]
        # since we start at the train_test_split_time (2016-10-01) and walk forward by
        # the test_as_of_date_frequency (1 month) until we've exhausted the test_duration
        # (3 months), exclusive (see comments in the method for details)
        test_matrix_definitions = self.define_test_matrices(
            train_test_split_time=train_test_split_time,
            test_duration=test_duration,
            test_label_timespan=test_label_timespan,
        )

        matrix_set_definition = {
            "feature_start_time": self.feature_start_time,
            "feature_end_time": self.feature_end_time,
            "label_start_time": self.label_start_time,
            "label_end_time": self.label_end_time,
            "train_matrix": train_matrix_definition,
            "test_matrices": test_matrix_definitions,
        }
        logger.spam(f"Matrix definitions for train/test split {train_test_split_time}: {matrix_set_definition}")

        return matrix_set_definition

    def define_train_matrix(
        self,
        train_test_split_time,
        training_label_timespan,
        max_training_history,
        training_as_of_date_frequency,
    ):
        """ Given a split time and the parameters of a training matrix, generate
        the as of times and metadata for a train matrix.

        Arguments:
            train_test_split_time (datetime.datetime): the limit of the last label in the matrix
            training_label_timespan (str): how much time is covered by the labels
            max_training_history (str): how far back from split do as_of_times go
            training_as_of_date_frequency (str): how much time between rows for an entity

        return:
            dict: dictionary containing the temporal parameters and as of times
                  for a train matrix
        """
        logger.debug(f"Generating train matrix definitions for train/test split {train_test_split_time}")
        # for our example, this will be called with:
        #   train_test_split_time = 2016-10-01
        #   training_label_timespan = 6month
        #   max_training_history = 2year
        #   training_as_of_date_frequency = 1day

        # last as of time in the matrix is 1 label span before split to provide
        # enough of a buffer for the label data to avoid spilling into the test
        # matrix and causing a leakage problem.
        #
        # e.g., last_train_as_of_time = 2016-10-01 - 6month = 2016-04-01
        training_prediction_delta = convert_str_to_relativedelta(
            training_label_timespan
        )
        last_train_as_of_time = train_test_split_time - training_prediction_delta

        # earliest time in matrix can't be farther back than the latest of the beginning
        # of label time or the beginning of feature time -- whichever is latest is the
        # limit if the amount of history we want to take would go further back.
        #
        # e.g., 2016-04-01 - 2year = 2014-04-01, which is later than both our
        # label_start_time (2012-01-01) and our feature_start_time (1995-01-01), so we
        # can use earliest_possible_train_as_of_time = 2014-04-01
        max_training_delta = convert_str_to_relativedelta(max_training_history)
        earliest_possible_train_as_of_time = last_train_as_of_time - max_training_delta
        experiment_as_of_time_limit = max(
            self.label_start_time, self.feature_start_time
        )
        if earliest_possible_train_as_of_time < experiment_as_of_time_limit:
            earliest_possible_train_as_of_time = experiment_as_of_time_limit
        logger.spam(f"Earliest possible train as of time: {earliest_possible_train_as_of_time}")

        # with the last as of time and the earliest possible time known,
        # calculate all the as of times for the matrix, stepping backwards
        # from the last as of time (to ensure that we use the latest possible
        # training data even if there's a gap and things don't line up
        # exactly) by the training_as_of_date_frequency
        #
        # for our example, this will give us a list of every day from 2014-04-01
        # through 2016-04-01, including _both_ endpoints
        train_as_of_times = self.calculate_as_of_times(
            as_of_start_limit=earliest_possible_train_as_of_time,
            as_of_end_limit=last_train_as_of_time,
            data_frequency=convert_str_to_relativedelta(training_as_of_date_frequency),
        )
        logger.spam(f"Train as of times: {train_as_of_times}")

        # create a dict of the matrix metadata
        matrix_definition = {
            "first_as_of_time": min(train_as_of_times),
            "last_as_of_time": max(train_as_of_times),
            "matrix_info_end_time": train_test_split_time,
            "as_of_times": AsOfTimeList(train_as_of_times),
            "training_label_timespan": training_label_timespan,
            "training_as_of_date_frequency": training_as_of_date_frequency,
            "max_training_history": max_training_history,
        }

        return matrix_definition

    def define_test_matrices(
        self, train_test_split_time, test_duration, test_label_timespan
    ):
        """ Given a train/test split time and a set of testing parameters,
        generate the metadata and as of times for the test matrices in a split.

        Arguments:
            train_test_split_time (datetime.datetime): the limit of the last label in the matrix
            test_duration (str): how far forward from split do test as_of_times go
            test_label_timespan (str): how much time is covered by test labels

        return:
            list: list of dictionaries defining the test matrices for a split
        """

        # for our example, this will be called with:
        #   train_test_split_time = 2016-10-01
        #   test_duration = 3month
        #   test_label_timespan = 6month

        # the as_of_time_limit is simply the split time plus the test_duration and we
        # can avoid checking here for any issues with the label_end_time or
        # feature_end_time since we've guaranteed that those limits would be
        # satisfied when we calculated the train_test_split_times initially
        #
        # for the example, as_of_time_limit = 2016-10-01 + 3month = 2017-01-01
        # (note as well that this will be treated as an _exclusive_ limit)
        logger.debug(f"Generating test matrix definitions for train/test split {train_test_split_time}")
        test_definitions = []
        test_delta = convert_str_to_relativedelta(test_duration)
        as_of_time_limit = train_test_split_time + test_delta
        logger.spam("All test as of times before %s", as_of_time_limit)

        # calculate the as_of_times associated with each test data frequency
        # for our example, we just have one, 1month
        for test_as_of_date_frequency in self.test_as_of_date_frequencies:
            logger.spam(f"Generating test matrix definitions for test data frequency {test_as_of_date_frequency}")

            # for test as_of_times we step _forwards_ from the train_test_split_time
            # to ensure that we always have a prediction set made immediately after
            # training is done (so, the freshest possible predictions) even if the
            # frequency doesn't divide the test_duration evenly so there's a gap before
            # the as_of_time_limit
            #
            # for our example, this will give three as_of_dates:
            #   [2016-10-01, 2016-11-01, 2016-12-01]
            # since we start at the train_test_split_time (2016-10-01) and walk forward by
            # the test_as_of_date_frequency (1 month) until we've exhausted the test_duration
            # (3 months), exclusive (see comments in the method for details)
            test_as_of_times = self.calculate_as_of_times(
                as_of_start_limit=train_test_split_time,
                as_of_end_limit=as_of_time_limit,
                data_frequency=convert_str_to_relativedelta(test_as_of_date_frequency),
                forward=True,
            )
            logger.spam(f"test as of times: {test_as_of_times}")
            test_definition = {
                "first_as_of_time": train_test_split_time,
                "last_as_of_time": max(test_as_of_times),
                "matrix_info_end_time": max(test_as_of_times)
                + convert_str_to_relativedelta(test_label_timespan),
                "as_of_times": AsOfTimeList(test_as_of_times),
                "test_label_timespan": test_label_timespan,
                "test_as_of_date_frequency": test_as_of_date_frequency,
                "test_duration": test_duration,
            }
            test_definitions.append(test_definition)
        return test_definitions