whylabs/whylogs-python

View on GitHub
python/whylogs/api/logger/logger.py

Summary

Maintainability
A
2 hrs
Test Coverage
import atexit
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional

from whylogs.api.logger.result_set import (
    ProfileResultSet,
    ResultSet,
    SegmentedResultSet,
)
from whylogs.api.logger.segment_processing import (
    _result_set_for_segment_key_values,
    segment_processing,
)
from whylogs.api.store import ProfileStore
from whylogs.api.writer import Writer, Writers
from whylogs.core import DatasetProfile, DatasetSchema
from whylogs.core.errors import LoggingError
from whylogs.core.input_resolver import _pandas_or_dict
from whylogs.core.metadata import (
    _populate_common_profile_metadata,
    _safe_merge_metadata,
)
from whylogs.core.stubs import pd

logger = logging.getLogger(__name__)


class BasicCache(object):
    _cache: Dict[DatasetSchema, DatasetProfile] = {}

    def get(self, schema: DatasetSchema) -> DatasetProfile:
        candidate = self._cache.get(schema)
        if candidate is None:
            candidate = DatasetProfile(schema=schema)
            self._cache[schema] = candidate
        return candidate


class Logger(ABC):
    def __init__(self, schema: Optional[DatasetSchema] = None):
        self._is_closed = False
        self._schema = schema
        self._writers: List[Writer] = []
        atexit.register(self.close)
        self._store_list: List[ProfileStore] = []
        self._segment_cache = None
        self._metadata: Optional[Dict[str, str]] = None

    def check_writer(self, _: Writer) -> None:
        """Checks if a writer is configured correctly for this class"""

    def append_writer(self, name: Optional[str] = None, *, writer: Optional[Writer] = None, **kwargs: Any) -> None:
        if name is None and writer is None:
            raise ValueError("Must specify either the writer name or a Writer object")
        if name is not None and writer is not None:
            raise ValueError("Cannot specify name and writer at the same time")
        if name is not None:
            writer = Writers.get(name, **kwargs)

        assert writer is not None
        self.check_writer(writer)
        self._writers.append(writer)

    def append_store(self, store: ProfileStore) -> None:
        self._store_list.append(store)

    @abstractmethod
    def _get_matching_profiles(
        self,
        obj: Any = None,
        *,
        pandas: Optional[pd.DataFrame] = None,
        row: Optional[Dict[str, Any]] = None,
        schema: Optional[DatasetSchema] = None,
    ) -> List[DatasetProfile]:
        pass

    def log(
        self,
        obj: Any = None,
        *,
        pandas: Optional[pd.DataFrame] = None,
        row: Optional[Dict[str, Any]] = None,
        schema: Optional[DatasetSchema] = None,
        timestamp_ms: Optional[int] = None,  # Not the dataset timestamp, but the timestamp of the data
        name: Optional[str] = None,
        trace_id: Optional[str] = None,
        tags: Optional[List[str]] = None,
        segment_key_values: Optional[Dict[str, str]] = None,
    ) -> ResultSet:
        """
        Args:
            timestamp_ms: The timestamp of the data being logged. This defaults to now if it isn't provided.
             This is used to determine what the dataset timestamp should be. For an hourly model, the dataset
             timestamp will end up being the start of the hour of the provided timestamp_ms, UTC.
        """
        if self._is_closed:
            raise LoggingError("Cannot log to a closed logger")
        if obj is None and pandas is None and row is None:
            # TODO: check for shell environment and emit more verbose error string to let user know how to correct.
            raise LoggingError("log() was called without passing in any input!")

        if name is not None:
            if self._metadata is None:
                self._metadata = dict()
            self._metadata["name"] = name
        active_schema = schema or self._schema
        if active_schema:
            pandas, row = _pandas_or_dict(obj, pandas, row)
            obj = None
            pandas, row = active_schema._run_udfs(pandas, row)

        # If segments are defined use segment_processing to return a SegmentedResultSet
        if active_schema and active_schema.segments:
            segmented_results: SegmentedResultSet = segment_processing(
                schema=active_schema,
                obj=obj,
                pandas=pandas,
                row=row,
                segment_cache=self._segment_cache,
                segment_key_values=segment_key_values,
            )
            # Update the existing segmented_results metadata with the trace_id and other keys if not present
            _populate_common_profile_metadata(segmented_results.metadata, trace_id=trace_id, tags=tags)
            _safe_merge_metadata(default_metadata=segmented_results.metadata, incoming_metadata=active_schema.metadata)
            return segmented_results

        profiles = self._get_matching_profiles(obj, pandas=pandas, row=row, schema=active_schema)

        for prof in profiles:
            prof.track(obj, pandas=pandas, row=row, execute_udfs=False)
            prof._metadata = _populate_common_profile_metadata(prof._metadata, trace_id=trace_id, tags=tags)
            if active_schema:
                _safe_merge_metadata(prof._metadata, active_schema.metadata)

        first_profile = profiles[0]
        if name is not None:
            if first_profile._metadata is None:
                first_profile._metadata = dict()
            first_profile._metadata["name"] = name

        if segment_key_values:
            return _result_set_for_segment_key_values(segment_key_values, first_profile)
        return ProfileResultSet(first_profile)

    def close(self) -> None:
        self._is_closed = True

    def __enter__(self) -> "Logger":
        return self

    def __exit__(self, exception_type: Any, exception_value: Any, traceback: Any) -> None:
        if not self._is_closed:
            self.close()