KarrLab/wc_sim

View on GitHub
wc_sim/utils/time_ordered_list.py

Summary

Maintainability
C
7 hrs
Test Coverage
D
67%
""" Doubly-linked list ordered by time

:Author: Arthur Goldberg <Arthur.Goldberg@mssm.edu>
:Date: 2019-11-20
:Copyright: 2019, Karr Lab
:License: MIT
"""

import math


class DoublyLinkedNode(object):
    """ Node in doubly-linked list that stores timestamped data

    Attributes:
        _time (:obj:`Rational`): time value
        _data (:obj:`type`): the data
        _next (:obj:`DoublyLinkedNode`): next node, if linked
        _prev (:obj:`DoublyLinkedNode`): previous node, if linked
    """

    def __init__(self, time, data):
        """
        Args:
            time (:obj:`Rational`): time value
            data (:obj:`type`): the data at time `time`
        """
        self._time = time
        self._data = data
        self._next = None
        self._prev = None

    def link_on_left(self, right):
        """ Doubly-link this `DoublyLinkedNode` to the left of `right`

        Args:
            right (:obj:`DoublyLinkedNode`): a node to link on the right of this node
        """
        # use 'left' for symmetry
        left = self
        left._next = right
        right._prev = left

    def is_equal(self, other):
        """ Compare `DoublyLinkedNode`\ s

        Args:
            other (:obj:`DoublyLinkedNode`): a node to compare

        Returns:
            :obj:`bool`: `True` if `other` has the same values, otherwise `False`
        """
        if not isinstance(other, self.__class__):
            return False
        return (self._time == other._time and
                self._data == other._data)

    def __str__(self):
        """ Get a string representation

        Returns:
            :obj:`str`: string representation of a `TimeOrderedList`
        """
        rv = []
        for attr in ['_time', '_data']:
            rv.append(f"{attr}: {getattr(self, attr)}")
        if self._prev is None:
            rv.append('_prev is None')
        else:
            rv.append('_prev is not None')
        if self._next is None:
            rv.append('_next is None')
        else:
            rv.append('_next is not None')
        return '\n'.join(rv)


class TimeOrderedList(object):
    """ Doubly-linked list ordered by time

    Attributes:
        _head (:obj:`DoublyLinkedNode`): head node, which always has time = -infinity
        _tail (:obj:`DoublyLinkedNode`): head node, which always has time = +infinity
        _count (:obj:`int`): number of data nodes in this list
    """

    ### internal methods ###
    # be careful using internal methods -- they don't check bad input
    def __init__(self):
        # -inf and +inf sentinels simplify many operations
        self._head = DoublyLinkedNode(float("-inf"), None)
        self._tail = DoublyLinkedNode(float("inf"), None)
        self._head.link_on_left(self._tail)
        self._count = 0

    def _insert_after(self, existing_node, new_node):
        """ Insert `new_node` after node `existing_node`

        Args:
            existing_node (:obj:`DoublyLinkedNode`): node in the doubly-linked list
            new_node (:obj:`DoublyLinkedNode`): node to insert to the right of `existing_node`
        """
        # link new_node in between existing_node and the node on its right
        right_of_new_node = existing_node._next
        existing_node.link_on_left(new_node)
        new_node.link_on_left(right_of_new_node)
        self._count += 1

    def _delete(self, existing_node):
        """ Delete node `existing_node`

        Args:
            existing_node (:obj:`DoublyLinkedNode`): node in the doubly-linked list

        Returns:
            :obj:`DoublyLinkedNode`: the node that was existed, or `None` if the list is empty
        """
        if self.is_empty():
            return None
        # splice out existing_node
        existing_node._prev.link_on_left(existing_node._next)
        self._count -= 1
        return existing_node

    def _check_time(self, method, time):
        """ Check that `time` is a good time

        Args:
            method (:obj:`str`): the calling method that's checking the time
            time (:obj:`Rational`): time value

        Raises:
            :obj:`ValueError`: if `time` isn't an :obj:`int` or :obj:`float`, or is infinite or `NaN`
        """
        # check the time for method
        if not isinstance(time, (int, float)):
            raise ValueError(f"time '{time}' isn't a number")
        if math.isinf(time) or math.isnan(time):
            raise ValueError(f"cannot {method} at time {time}")

    ### external methods ###
    def is_empty(self):
        """ Is the sorted list empty?

        Returns:
            :obj:`bool`: `True` if the list is empty, otherwise `False`
        """
        return self._count == 0

    def clear(self):
        """ Clear this sorted list

        Remove all data nodes.
        """
        self._head.link_on_left(self._tail)
        self._count = 0

    def len(self):
        """ Provide length of this sorted list

        Returns:
            :obj:`int`: number of data nodes in this list
        """
        return self._count

    def insert(self, time, data):
        """ Insert data `data` at time `time`

        Args:
            time (:obj:`Rational`): time value
            data (:obj:`type`): the data at time `time`

        Raises:
            :obj:`ValueError`: if data at time is already in the list
        """
        self._check_time('insert', time)
        left_node = self.find(time)
        if left_node._time == time:
            raise ValueError(f"time {time} already in queue")
        else:
            self._insert_after(left_node, DoublyLinkedNode(time, data))

    def find(self, time):
        """ Find the node with the largest time <= `time`

        Args:
            time (:obj:`Rational`): time value

        Returns:
            :obj:`DoublyLinkedNode`: node with the largest time <= `time`
        """
        self._check_time('find', time)
        node = self._head
        while node._time <= time:
            node = node._next
        return node._prev

    def delete(self, time):
        """ Delete the node at time `time`

        Args:
            time (:obj:`Rational`): time value

        Returns:
            :obj:`DoublyLinkedNode`: the node that's deleted

        Raises:
            :obj:`ValueError`: if no node with time `time` is in the queue
        """
        self._check_time('delete', time)
        left_node = self.find(time)
        if left_node._time == time:
            return self._delete(left_node)
        else:
            raise ValueError(f"no node with time {time} in queue")

    def gc_old_data(self, time, min_to_keep=2):
        """ Garbage collect old nodes and data

        Keep all nodes with time >= `time` and at least `min_to_keep` of the highest time nodes, if
        enough are available. Delete the rest. If the queue has fewer than `min_to_keep` nodes do nothing.
        Recommendation: keep `min_to_keep` large enough to enable any fitting done over nearby data.

        Args:
            time (:obj:`Rational`): time value
            min_to_keep (:obj:`int`): minimum number of nodes to leave in this `TimeOrderedList`

        Returns:
            :obj:`int`: the number of nodes deleted
        """
        self._check_time('gc_old_data', time)
        if self.len() < min_to_keep or self.len() == 0:
            return 0
        node = self._tail._prev
        num_keeping = 0
        # iterate down the list to the highest time node to delete
        while time <= node._time:
            node = node._prev
            num_keeping += 1
        if node == self._head:
            return 0
        # iterate further if not keeping enough, but don't go to the head
        while num_keeping < min_to_keep and node._prev and node._prev._prev:
            node = node._prev
            num_keeping += 1
        highest_to_delete = node
        num_being_deleted = self.len() - num_keeping
        self._count = num_keeping
        # splice around all nodes between head and the node to the right of highest_to_delete
        self._head.link_on_left(highest_to_delete._next)
        return num_being_deleted

    def __str__(self):
        """ Get a string representation

        Returns:
            :obj:`str`: string representation of a `TimeOrderedList`
        """
        rv = []
        rv.append(f'{self.len()} nodes:')
        node = self._head._next
        while node._time < float('inf'):
            rv.append(f'{node._time}: {node._data}')
            node = node._next
        return '\n'.join(rv)


class LinearInterpolationList(TimeOrderedList):
    """ Time ordered list with linear interpolation of data values in nodes
    """

    def read(self, time):
        """ Get the data at time `time`, interpolating if needed

        Linear interpolation is done when the time is between data points.

        Args:
            time (:obj:`Rational`): time value

        Returns:
            :obj:`type`: the estimated data at `time`, or `NaN` if it cannot be estimated
        """
        self._check_time('read', time)
        left_node = self.find(time)
        if left_node._time == time:
            return left_node._data
        # interpolate if possible
        if left_node._next._time == float("inf"):
            # cannot interpolate with no data at times greater than `time` - assume slope == 0
            return left_node._data
        elif left_node._time == -float("inf"):
            # no data available at time <= `time`
            return float('NaN')
        else:
            # get slope between left_node and left_node._next
            slope = (left_node._next._data - left_node._data) / (left_node._next._time - left_node._time)
        # interpolate
        return left_node._data + slope * (time - left_node._time)


class MultiAlgorithmSpeciesPopNode(object):
    """ Data node in a `MultiAlgorithmSpeciesPopHistory`

    Attributes:
        pop_before (:obj:`float`): species population infinitesimally before the time of this node
        pop_after (:obj:`float`): species population infinitesimally after the time of this node
    """

    def __init__(self,  **kwargs):
        """
        Args:
            kwargs (:obj:`dict`, optional): keywords for initialization
                allowed keywords:
                    `pop_before` - if supplied, will be assigned to `self.pop_before`
                    `pop_after` - if supplied, will be assigned to `self.pop_after`
                    `pop` - if supplied, will be assigned to `self.pop_before` and `self.pop_after`
                        if they're not otherwise initialized
        """
        kwargs_mapping = (('pop', 'pop_before'),
                          ('pop', 'pop_after'),
                          ('pop_before', 'pop_before'),
                          ('pop_after', 'pop_after'),
                         )
        self.pop_before = None
        self.pop_after = None
        for keyword, attr_name in kwargs_mapping:
            if keyword in kwargs:
                setattr(self, attr_name, kwargs[keyword])


class MultiAlgorithmSpeciesPopHistory(TimeOrderedList):
    """ Time ordered list with linear interpolation of data values in nodes

    Manage a species population history with step function changes from discrete time
    integration algorithms like SSA, and slope changes from continuous time integration
    algorithms like ODE and dFBA.

    To enable interpolation of population, a `MultiAlgorithmSpeciesPopHistory` must be initialized
    with 1 data node, and never contain fewer than 2 data nodes after a 2nd one is inserted.
    """

    @property
    def _earliest_time(self):
        """ The time of the earliest data in the history
        """
        return self._head._next._time

    @property
    def _latest_time(self):
        """ The time of the latest data in the history
        """
        return self._tail._prev._time

    def _check_time_strict(self, time):
        """ Check that `time` is a good time

        Args:
            time (:obj:`Rational`): time value

        Raises:
            :obj:`ValueError`: if `time` is earlier than the earliest data in the history, or
                later than the latest data in the history
        """
        if time < self._earliest_time:
            raise ValueError(f"time {time} earlier than earliest data {self._earliest_time} in history")
        if self._latest_time < time:
            raise ValueError(f"time {time} later than the latest data {self._latest_time} in history")

    def _slope_between_node_pair(self, left_node):
        """ Get the population slope between `left_node` and the node to its right

        Args:
            left_node (:obj:`DoublyLinkedNode`): node in the history

        Returns:
            :obj:`float`: the rate of population change between `left_node` and the node to its right
        """
        # todo: perhaps raise an error if pops aren't defined or times are infinite
        right_node = left_node._next
        return (right_node._data.pop_before - left_node._data.pop_after) / \
               (right_node._time - left_node._time)

    def slope(self, time, left_node=None):
        """ Get the population slope at time `time`

        Args:
            time (:obj:`Rational`): time value
            left_node (:obj:`DoublyLinkedNode`, optional): node to use to avoid cost of `find`

        Returns:
            :obj:`float`: the estimated population slope at `time`

        Raises:
            :obj:`ValueError`: if `time` is earlier than the earliest data in the history, or
                later than the latest data
        """
        self._check_time('slope', time)
        if self._latest_time < time:
            # slope unknown for time later than latest data in history
            return 0
        self._check_time_strict('slope', time)
        if left_node is None:
            left_node = self.find(time)
        if left_node._time == time:
            if left_node._time == self._earliest_time and left_node._time == self._latest_time:
                # only 1 node in history; cannot compute slope
                return 0
            if left_node._time == self._earliest_time:
                # at least 1 node in history after left_node; return slope between left and next node
                return self._slope_between_node_pair(left_node)
            if left_node._time == self._latest_time:
                # at least 1 node in history before left_node; return slope between prev node and left node
                return self._slope_between_node_pair(left_node._prev)
            # history contains at least 1 node before left_node and 1 node after left_node
            # symmetrically, return slope between prev node and next node
            prev_node = left_node._prev
            next_node = left_node._next
            return (next_node._data.pop_before - prev_node._data.pop_after) / \
                   (next_node._time - next_node._time)

        else:
            # at least 1 node in history after left_node; return slope between left and next node
            return self._slope_between_node_pair(left_node)

    def read(self, time, left_node=None):
        """ Get the data at time `time`, interpolating if needed

        By convention, if time `time` coincides with a discrete population change, then `read`
            returns `pop_before`, the species population infinitesimally before the time
        Linear interpolation is done when adjacent data points are available.

        Args:
            time (:obj:`Rational`): time value
            left_node (:obj:`DoublyLinkedNode`, optional): node to use to avoid cost of `find`

        Returns:
            :obj:`tuple`: the estimated population and slope at `time`
        """
        self._check_time('read', time)
        self._check_time_strict('read', time)
        if left_node is None:
            left_node = self.find(time)
        # interpolate
        slope = self.slope(time, left_node=left_node)
        if left_node._time == time:
            return (left_node._data.pop_before, slope)
        return (slope * (time - left_node._time) + left_node._data.pop_after, slope)

    def insert_discrete_change(self, time, population_change):
        """ Insert a discrete population change at time `time`

        Args:
            time (:obj:`Rational`): time value
            population_change (:obj:`float`): the population change at time `time`
        """
        self._check_time('insert_discrete_change', time)
        left_node = self.find(time)
        if left_node._time != time:
            # if no change exists at time, so add one
            pop, _ = self.read(time, left_node=left_node)
            # change in population is additive
            pop_after = pop + population_change
            kwargs = dict(pop_before=pop,
                          pop_after=pop_after)
            data_node = MultiAlgorithmSpeciesPopNode(**kwargs)
            self._insert_after(left_node, DoublyLinkedNode(time, data_node))
        else:
            # else update the existing change at time
            data_node = left_node.data
            # change in population is additive
            data_node.pop_after = data_node.pop_after + population_change

        # propagate the change
        self.propagate_change(time)

    def insert_continuous_change(self, start_time, end_time, end_population):
        """ Insert a continuous population change that ends at time `end_time`

        Args:
            start_time (:obj:`Rational`): time when the continuous population change started
            end_time (:obj:`Rational`): time when the continuous population change ended
            end_population (:obj:`float`): the species population at time `end_time`

        Raises:
            :obj:`ValueError`: if `end_time` <= `start_time`
        """
        self._check_time('insert_continuous_change: start_time', start_time)
        self._check_time('insert_continuous_change: end_time', end_time)
        if end_time <= start_time:
            raise ValueError(f"end_time {end_time} <= start_time {start_time}")

        # slope of change = (end_population - pop @ end_time) / (end_time - start_time)
        pop_at_end_time, _ = self.read(end_time)
        slope_of_change = (end_population - pop_at_end_time) / (end_time - start_time)
        if slope_of_change == 0:
            return

        # the slope of change is recorded by changing populations in [start_time, end_time]:
        # 1) a node at `start_time` records the current population there
        # 2) all nodes in (start_time, end_time) have their populations changed to reflect the slope
        # 3) a node at `end_time` has pop_before = end_population

        # propagate the change
        self.propagate_change(time, duration=duration)

    def propagate_change(self, time):
        """ Propagate a change made at time `time` to the end of this history

        Args:
            time (:obj:`Rational`): time value
        """
        # todo: write

    def average_pop(self, start_time, end_time):
        """ Determine the average population from `start_time` to `end_time`

        Args:
            start_time (:obj:`Rational`): start time of the average
            end_time (:obj:`Rational`): end time of the average
        """
        # todo: write