View on GitHub


3 hrs
Test Coverage
import logging
from datetime import timedelta
from copy import deepcopy
from typing import List

from aw_core.models import Event

logger = logging.getLogger(__name__)

def flood(events: List[Event], pulsetime: float = 5) -> List[Event]:
    Takes a list of events and "floods" any empty space between events by extending one of the surrounding events to cover the empty space.

    For more details on flooding, see this issue:
    # Originally written in aw-research:
    # NOTE: This algorithm has a lot of smaller details that need to be
    #       carefully considered by anyone wishing to edit it, see:
    #        -

    events = deepcopy(events)
    events = sorted(events, key=lambda e: e.timestamp)

    # If negative gaps are smaller than this, prune them to become zero
    negative_gap_trim_thres = timedelta(seconds=0.1)

    warned_about_negative_gap_safe = False
    warned_about_negative_gap_unsafe = False

    for e1, e2 in zip(events[:-1], events[1:]):
        gap = e2.timestamp - (e1.timestamp + e1.duration)

        if not gap:

        # Sanity check in case events overlap
        if gap < timedelta(0) and ==
            # Events with negative gap but same data can safely be merged
            start = min(e1.timestamp, e2.timestamp)
            end = max(e1.timestamp + e1.duration, e2.timestamp + e2.duration)
            e1.timestamp, e1.duration = start, (end - start)
            e2.timestamp, e2.duration = end, timedelta(0)
            if not warned_about_negative_gap_safe:
                    "Gap was of negative duration but could be safely merged ({}s). This message will only show once per batch.".format(
                warned_about_negative_gap_safe = True
        elif gap < -negative_gap_trim_thres and not warned_about_negative_gap_unsafe:
            # Events with negative gap but differing data cannot be merged safely
                "Gap was of negative duration and could NOT be safely merged ({}s). This warning will only show once per batch.".format(
            warned_about_negative_gap_unsafe = True
            # logger.warning("Event 1 (id {}): {} {}".format(, e1.timestamp, e1.duration))
            # logger.warning("Event 2 (id {}): {} {}".format(, e2.timestamp, e2.duration))
        elif -negative_gap_trim_thres < gap <= timedelta(seconds=pulsetime):
            e2_end = e2.timestamp + e2.duration

            # Prioritize flooding from the longer event
            if e1.duration >= e2.duration:
                if ==
                    # Extend e1 to the end of e2
                    # Set duration of e2 to zero (mark to delete)
                    e1.duration = e2_end - e1.timestamp
                    e2.timestamp = e2_end
                    e2.duration = timedelta(0)
                    # Extend e1 to the start of e2
                    e1.duration = e2.timestamp - e1.timestamp
                if ==
                    # Extend e2 to the start of e1, discard e1
                    e2.timestamp = e1.timestamp
                    e2.duration = e2_end - e2.timestamp
                    e1.duration = timedelta(0)
                    # Extend e2 backwards to end of e1
                    e2.timestamp = e1.timestamp + e1.duration
                    e2.duration = e2_end - e2.timestamp

    # Filter out remaining zero-duration events
    events = [e for e in events if e.duration > timedelta(0)]

    return events