async-worker/aiologger

View on GitHub
aiologger/logger.py

Summary

Maintainability
B
6 hrs
Test Coverage
A
98%
import asyncio
import io
import sys
import traceback
from asyncio import AbstractEventLoop, Task
from typing import Iterable, Optional, Callable, Awaitable, List, NamedTuple

from aiologger.filters import StdoutFilter, Filterer
from aiologger.formatters.base import Formatter
from aiologger.handlers.base import Handler
from aiologger.handlers.streams import AsyncStreamHandler
from aiologger.levels import LogLevel, check_level
from aiologger.records import LogRecord
from aiologger.utils import get_current_frame, create_task

_HandlerFactory = Callable[[], Awaitable[Iterable[Handler]]]


class _Caller(NamedTuple):
    filename: str
    line_number: int
    function_name: str
    stack: Optional[str]


def o_o():
    """
    Ordinarily we would use __file__ for this, but frozen modules don't always
    have __file__ set, for some reason (see Issue logging#21736). Thus, we get
    the filename from a handy code object from a function defined in this
    module.
    """
    raise NotImplementedError(
        "I shouldn't be called. My only purpose is to provide "
        "the filename from a handy code object."
    )


# _srcfile is used when walking the stack to check when we've got the first
# caller stack frame, by skipping frames whose filename is that of this
# module's source. It therefore should contain the filename of this module's
# source file.
_srcfile = o_o.__code__.co_filename


class Logger(Filterer):
    def __init__(self, *, name="aiologger", level=LogLevel.NOTSET) -> None:
        super(Logger, self).__init__()
        self.name = name
        self.level = check_level(level)
        self.parent = None
        self.propagate = True
        self.handlers: List[Handler] = []
        self.disabled = False
        self._was_shutdown = False

        self._dummy_task: Optional[Task] = None

    @classmethod
    def with_default_handlers(
        cls,
        *,
        name="aiologger",
        level=LogLevel.NOTSET,
        formatter: Optional[Formatter] = None,
        **kwargs,
    ):
        self = cls(name=name, level=level, **kwargs)  # type: ignore

        _AsyncStreamHandler = AsyncStreamHandler
        self.add_handler(
            _AsyncStreamHandler(
                stream=sys.stdout,
                level=LogLevel.DEBUG,
                formatter=formatter,
                filter=StdoutFilter(),
            )
        )
        self.add_handler(
            _AsyncStreamHandler(
                stream=sys.stderr, level=LogLevel.WARNING, formatter=formatter
            )
        )

        return self

    def find_caller(self, stack_info=False) -> _Caller:
        """
        Find the stack frame of the caller so that we can note the source
        file name, line number and function name.
        """
        frame = get_current_frame()
        # On some versions of IronPython, currentframe() returns None if
        # IronPython isn't run with -X:Frames.
        if frame is not None:
            frame = frame.f_back
        while hasattr(frame, "f_code"):
            code = frame.f_code
            filename = code.co_filename
            if filename == _srcfile:
                frame = frame.f_back
                continue
            sinfo = None
            if stack_info:
                sio = io.StringIO()
                sio.write("Stack (most recent call last):\n")
                traceback.print_stack(frame, file=sio)
                sinfo = sio.getvalue()
                if sinfo[-1] == "\n":
                    sinfo = sinfo[:-1]
                sio.close()
            return _Caller(
                filename=code.co_filename or "(unknown file)",
                line_number=frame.f_lineno,
                function_name=code.co_name,
                stack=sinfo,
            )
        return _Caller(
            filename="(unknown file)",
            line_number=0,
            function_name="(unknown function)",
            stack=None,
        )

    async def call_handlers(self, record):
        """
        Pass a record to all relevant handlers.

        Loop through all handlers for this logger and its parents in the
        logger hierarchy. If no handler was found, raises an error. Stop
        searching up the hierarchy whenever a logger with the "propagate"
        attribute set to zero is found - that will be the last logger
        whose handlers are called.
        """
        c = self
        found = 0
        while c:
            for handler in c.handlers:
                found = found + 1
                if record.levelno >= handler.level:
                    await handler.handle(record)
            if not c.propagate:
                c = None  # break out
            else:
                c = c.parent
        if found == 0:
            raise Exception("No handlers could be found for logger")

    def add_handler(self, handler: Handler) -> None:
        """
        Add the specified handler to this logger.
        """
        if not (handler in self.handlers):
            self.handlers.append(handler)

    def remove_handler(self, handler: Handler) -> None:
        """
        Remove the specified handler from this logger.
        """
        if handler in self.handlers:
            self.handlers.remove(handler)

    async def handle(self, record):
        """
        Call the handlers for the specified record.

        This method is used for unpickled records received from a socket, as
        well as those created locally. Logger-level filtering is applied.
        """
        if (not self.disabled) and self.filter(record):
            await self.call_handlers(record)

    def _log(
        self,
        level,
        msg,
        args,
        exc_info=None,
        extra=None,
        stack_info=False,
        caller: _Caller = None,
    ) -> Task:

        sinfo = None
        if _srcfile and caller is None:  # type: ignore
            # IronPython doesn't track Python frames, so find_caller raises an
            # exception on some versions of IronPython. We trap it here so that
            # IronPython can use logging.
            try:
                fn, lno, func, sinfo = self.find_caller(stack_info)
            except ValueError:  # pragma: no cover
                fn, lno, func = "(unknown file)", 0, "(unknown function)"
        elif caller:
            fn, lno, func, sinfo = caller
        else:  # pragma: no cover
            fn, lno, func = "(unknown file)", 0, "(unknown function)"
        if exc_info and isinstance(exc_info, BaseException):
            exc_info = (type(exc_info), exc_info, exc_info.__traceback__)

        record = LogRecord(  # type: ignore
            name=self.name,
            level=level,
            pathname=fn,
            lineno=lno,
            msg=msg,
            args=args,
            exc_info=exc_info,
            func=func,
            sinfo=sinfo,
            extra=extra,
        )
        return create_task(self.handle(record))

    def __make_dummy_task(self) -> Task:
        async def _dummy(*args, **kwargs):
            return

        return create_task(_dummy())

    def is_enabled_for(self, level) -> bool:
        return level >= self.level

    def _make_log_task(self, level, msg, *args, **kwargs) -> Task:
        """
        Creates an asyncio.Task for a msg if logging is enabled for level.
        Returns a dummy task otherwise.
        """
        if not self.is_enabled_for(level):
            if self._dummy_task is None:
                self._dummy_task = self.__make_dummy_task()
            return self._dummy_task

        if kwargs.get("exc_info", False):
            if not isinstance(kwargs["exc_info"], BaseException):
                kwargs["exc_info"] = sys.exc_info()

        return self._log(  # type: ignore
            level, msg, *args, caller=self.find_caller(False), **kwargs
        )

    def debug(self, msg, *args, **kwargs) -> Task:
        """
        Log msg with severity 'DEBUG'.

        To pass exception information, use the keyword argument exc_info with
        a true value, e.g.

        await logger.debug("Houston, we have a %s", "thorny problem", exc_info=1)
        """
        return self._make_log_task(LogLevel.DEBUG, msg, args, **kwargs)

    def info(self, msg, *args, **kwargs) -> Task:
        """
        Log msg with severity 'INFO'.

        To pass exception information, use the keyword argument exc_info with
        a true value, e.g.

        await logger.info("Houston, we have an interesting problem", exc_info=1)
        """
        return self._make_log_task(LogLevel.INFO, msg, args, **kwargs)

    def warning(self, msg, *args, **kwargs) -> Task:
        """
        Log msg with severity 'WARNING'.

        To pass exception information, use the keyword argument exc_info with
        a true value, e.g.

        await logger.warning("Houston, we have a bit of a problem", exc_info=1)
        """
        return self._make_log_task(LogLevel.WARNING, msg, args, **kwargs)

    warn = warning

    def error(self, msg, *args, **kwargs) -> Task:
        """
        Log msg with severity 'ERROR'.

        To pass exception information, use the keyword argument exc_info with
        a true value, e.g.

        await logger.error("Houston, we have a major problem", exc_info=1)
        """
        return self._make_log_task(LogLevel.ERROR, msg, args, **kwargs)

    def critical(self, msg, *args, **kwargs) -> Task:
        """
        Log msg with severity 'CRITICAL'.

        To pass exception information, use the keyword argument exc_info with
        a true value, e.g.

        await logger.critical("Houston, we have a major disaster", exc_info=1)
        """
        return self._make_log_task(LogLevel.CRITICAL, msg, args, **kwargs)

    fatal = critical

    def exception(self, msg, *args, exc_info=True, **kwargs) -> Task:
        """
        Convenience method for logging an ERROR with exception information.
        """
        return self.error(msg, *args, exc_info=exc_info, **kwargs)

    async def shutdown(self):
        """
        Perform any cleanup actions in the logging system (e.g. flushing
        buffers).

        Should be called at application exit.
        """
        if self._was_shutdown:
            return
        self._was_shutdown = True
        await self._do_shutdown()

    async def _do_shutdown(self):
        """
        Does actual shutdown
        """
        for handler in reversed(self.handlers):
            if not handler:
                continue
            try:
                if handler.initialized:
                    await handler.flush()
                    await handler.close()

            except Exception:
                """
                Ignore errors which might be caused
                because handlers have been closed but
                references to them are still around at
                application exit. Basically ignore everything,
                as we're shutting down
                """
                pass