whylabs/whylogs-python

View on GitHub
python/whylogs/api/logger/rolling.py

Summary

Maintainability
B
6 hrs
Test Coverage
import atexit
import logging
import math
import os
import time
from datetime import datetime, timezone
from threading import Timer
from typing import Any, Callable, Dict, List, Optional, Type

from typing_extensions import Literal

from whylogs.api.logger.logger import Logger
from whylogs.api.logger.result_set import ProfileResultSet, ResultSet
from whylogs.api.logger.segment_cache import SegmentCache
from whylogs.api.writer import Writer
from whylogs.core import DatasetProfile, DatasetProfileView, DatasetSchema
from whylogs.core.stubs import pd
from whylogs.core.view.segmented_dataset_profile_view import SegmentedDatasetProfileView

logger = logging.getLogger(__name__)


class Scheduler(object):
    """
    Multithreading scheduler.

    Schedule a function to be called repeatedly based on a schedule.
    """

    _TIMER: Type = Timer
    _timer: Any  # Timer

    def __init__(self, initial: float, interval: float, function: Callable, *args: Any, **kwargs: Any):
        self.initial = initial
        self._ran_initial = False
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.is_running = False
        self.start()

    def _run(self) -> None:
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self) -> None:
        if not self.is_running:
            interval = self.interval
            if not self._ran_initial:
                interval = self.initial
                self._ran_initial = True
            self._timer = Scheduler._TIMER(interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self) -> None:
        self._timer.cancel()
        self.is_running = False


class TimedRollingLogger(Logger):
    """A rolling logger that continuously rotates files based on time."""

    def __init__(
        self,
        schema: Optional[DatasetSchema] = None,
        *,
        base_name: Optional[str] = None,
        file_extension: Optional[str] = None,
        interval: int = 1,
        when: Literal["S", "M", "H", "D"] = "H",
        utc: bool = False,
        aligned: bool = True,
        fork: bool = False,
        skip_empty: bool = False,
        callback: Optional[Callable[[Writer, DatasetProfileView, str], None]] = None,
        metadata: Optional[Dict[str, str]] = None,
    ):
        super().__init__(schema)
        if base_name is None:
            base_name = "profile"
        if file_extension is None:
            file_extension = ".bin"

        self.file_extension = file_extension
        self.base_name = base_name
        self.aligned = aligned
        self.fork = fork
        self.skip_empty = skip_empty
        self._metadata = metadata

        # base on TimedRotatingFileHandler
        self.when = when.upper()
        if self.when == "S":
            self.interval = 1  # one second
            self.suffix = "%Y-%m-%d_%H-%M-%S"
        elif self.when == "M":
            self.interval = 60  # one minute
            self.suffix = "%Y-%m-%d_%H-%M"
        elif self.when == "H":
            self.interval = 60 * 60  # one hour
            self.suffix = "%Y-%m-%d_%H"
        elif self.when == "D" or self.when == "MIDNIGHT":
            self.interval = 60 * 60 * 24  # one day
            self.suffix = "%Y-%m-%d"
        self.interval = self.interval * interval  # multiply by units requested
        self.utc = utc

        now = time.time()
        self._current_batch_timestamp = self._compute_current_batch_timestamp(now)
        if schema and schema.segments:
            self._segment_cache = SegmentCache(schema)

        self._current_profile: DatasetProfile = DatasetProfile(
            schema=schema,
            dataset_timestamp=datetime.fromtimestamp(self._current_batch_timestamp, timezone.utc),
            metadata=self._metadata,
        )

        initial_run_after = (self._current_batch_timestamp + self.interval) - now
        if initial_run_after <= 0:
            logger.error(
                "Negative initial run after. This shouldn't happen so something went wrong with the clock here"
            )
            initial_run_after = self.interval
        self._callback = callback
        self._scheduler = Scheduler(initial_run_after, interval=self.interval, function=self._do_rollover)

        self._scheduler.start()

        atexit.register(self.close)

    def check_writer(self, writer: Writer) -> None:
        writer.check_interval(self.interval)

    def _compute_current_batch_timestamp(self, now: Optional[float] = None) -> int:
        if now is None:
            now = time.time()
        rounded_now = int(now)
        if self.aligned:
            return int(math.floor((rounded_now - 1) / self.interval)) * self.interval + self.interval
        return rounded_now

    def _get_matching_profiles(
        self,
        obj: Any = None,
        *,
        pandas: Optional[pd.DataFrame] = None,
        row: Optional[Dict[str, Any]] = None,
        schema: Optional[DatasetSchema] = None,
    ) -> List[DatasetProfile]:
        if schema and schema is not self._schema:
            raise ValueError(
                "You cannot pass a DatasetSchema to an instance of TimedRollingLogger.log(),"
                "because schema is set once when instantiated, please use TimedRollingLogger(schema) instead."
            )
        return [self._current_profile]

    def _do_rollover(self) -> None:
        if self._is_closed:
            return

        self._current_batch_timestamp = self._compute_current_batch_timestamp()
        dataset_timestamp = datetime.fromtimestamp(self._current_batch_timestamp, tz=timezone.utc)

        if self._segment_cache:
            self._flush(self._segment_cache.flush(dataset_timestamp))
            return

        old_profile = self._current_profile

        self._current_profile = DatasetProfile(
            schema=self._schema, dataset_timestamp=dataset_timestamp, metadata=self._metadata
        )

        while old_profile.is_active:
            time.sleep(1)

        if self.skip_empty and old_profile.is_empty:
            logger.debug("skip_empty is set for profile. Skipping empty profile.")
            return

        self._flush(ProfileResultSet(old_profile))

    def _get_time_tuple(self) -> time.struct_time:
        if self.utc:
            time_tuple = time.gmtime(self._current_batch_timestamp)
        else:
            time_tuple = time.localtime(self._current_batch_timestamp)
            current_time = int(time.time())

            dst_now = time.localtime(current_time)[-1]
            dst_then = time_tuple[-1]
            if dst_now != dst_then:
                if dst_now:
                    addend = 3600
                else:
                    addend = -3600
                time_tuple = time.localtime(self._current_batch_timestamp + addend)
        return time_tuple

    def _flush(self, results: ResultSet) -> None:
        if results is None:
            logger.debug("The result is None, skipping flush of result set.")
            return
        if results.count == 0:
            logger.debug("The result's count is zero, skipping flush of result set.")
            return
        profiles = results.get_writables()
        if profiles is None:
            logger.debug("The result set's writable is None, skipping flush of result set.")
            return
        number_of_profiles = len(profiles)
        if number_of_profiles == 0:
            logger.debug("The result set's writable list has length zero, skipping flush of result set.")
            return
        logger.debug(f"about to write {number_of_profiles} profiles.")

        pid = 0
        if self.fork:
            pid = os.fork()

        if pid > 0:
            logger.debug("Forked child process. Child process ID: %d", pid)
        else:
            if self.fork:
                logger.debug("In child process")
            else:
                logger.debug("Didn't fork. Writing in the same process")

            time_tuple = self._get_time_tuple()

            profile_count = 0
            for profile in profiles:
                has_segments = isinstance(profile, SegmentedDatasetProfileView)
                if has_segments:
                    timed_filename = f"{self.base_name}.{time.strftime(self.suffix, time_tuple)}_{profile.get_segment_string()}{self.file_extension}"
                else:
                    timed_filename = f"{self.base_name}.{time.strftime(self.suffix, time_tuple)}{self.file_extension}"
                logging.debug("Writing out put with timed_filename: %s", timed_filename)

                profile_count = profile_count + 1
                logger.debug(f"Writing profile {profile_count} of {number_of_profiles}")

                for store in self._store_list:
                    store.write(profile_view=profile, dataset_id=self.base_name)

                for w in self._writers:
                    w.write(file=profile, dest=timed_filename)
                    if self._callback and callable(self._callback):
                        self._callback(w, profile, timed_filename)
                if not self._writers and self._callback and callable(self._callback):
                    self._callback(None, profile, timed_filename)

    def close(self) -> None:
        logging.debug("Closing the writer")
        if not self._is_closed:
            self._scheduler.stop()
            self._do_rollover()
        super(TimedRollingLogger, self).close()