async-worker/aiologger

View on GitHub
aiologger/handlers/streams.py

Summary

Maintainability
A
0 mins
Test Coverage
A
96%
import asyncio
import sys
from asyncio import AbstractEventLoop, StreamWriter
from typing import Union, Optional

from aiologger.utils import get_running_loop
from aiologger.filters import Filter
from aiologger.formatters.base import Formatter
from aiologger.handlers.base import Handler
from aiologger.levels import LogLevel
from aiologger.protocols import AiologgerProtocol
from aiologger.records import LogRecord


class AsyncStreamHandler(Handler):
    terminator = "\n"

    def __init__(
        self,
        stream=None,
        level: Union[str, int, LogLevel] = LogLevel.NOTSET,
        formatter: Formatter = None,
        filter: Filter = None,
    ) -> None:
        super().__init__()
        if stream is None:
            stream = sys.stderr
        self.stream = stream
        self.level = level
        if formatter is None:
            formatter = Formatter()
        self.formatter: Formatter = formatter
        if filter:
            self.add_filter(filter)
        self.protocol_class = AiologgerProtocol
        self._initialization_lock = asyncio.Lock()
        self.writer: Optional[StreamWriter] = None

    @property
    def initialized(self):
        return self.writer is not None

    async def _init_writer(self) -> StreamWriter:
        async with self._initialization_lock:
            if self.writer is not None:
                return self.writer

            loop = get_running_loop()
            transport, protocol = await loop.connect_write_pipe(
                self.protocol_class, self.stream
            )

            self.writer = StreamWriter(  # type: ignore # https://github.com/python/typeshed/pull/2719
                transport=transport, protocol=protocol, reader=None, loop=loop
            )
            return self.writer

    async def handle(self, record: LogRecord) -> bool:
        """
        Conditionally emit the specified logging record.
        Emission depends on filters which may have been added to the handler.
        """
        rv = self.filter(record)
        if rv:
            await self.emit(record)
        return rv

    async def flush(self):
        await self.writer.drain()

    async def emit(self, record: LogRecord):
        """
        Actually log the specified logging record to the stream.
        """
        if self.writer is None:
            self.writer = await self._init_writer()

        try:
            msg = self.formatter.format(record) + self.terminator

            self.writer.write(msg.encode())
            await self.writer.drain()
        except Exception as exc:
            await self.handle_error(record, exc)

    async def close(self):
        """
        Tidy up any resources used by the handler.

        This version removes the handler from an internal map of handlers,
        should ensure that this gets called from overridden close()
        methods.
        """
        if self.writer is None:
            return
        await self.flush()
        self.writer.close()