homeworkprod/syslog2irc

View on GitHub
src/syslog2irc/syslog.py

Summary

Maintainability
A
0 mins
Test Coverage
"""
syslog2irc.syslog
~~~~~~~~~~~~~~~~~

BSD syslog message reception and handling

:Copyright: 2007-2021 Jochen Kupperschmidt
:License: MIT, see LICENSE for details.
"""

from __future__ import annotations
from functools import partial
import logging
from socketserver import (
    BaseRequestHandler,
    StreamRequestHandler,
    ThreadingTCPServer,
    ThreadingUDPServer,
)
import sys
from typing import Iterable, Union

import syslogmp
from syslogmp import Message as SyslogMessage

from .network import format_port, Port, TransportProtocol
from .signals import syslog_message_received
from .util import start_thread


logger = logging.getLogger(__name__)


class TCPHandler(StreamRequestHandler):
    """Handler for syslog messages arriving via TCP."""

    def __init__(self, port: Port, *args, **kwargs) -> None:
        self.port = port
        super().__init__(*args, **kwargs)

    def handle(self) -> None:
        for line in self.rfile:
            try:
                message = syslogmp.parse(line)
            except ValueError:
                logger.info(
                    'Invalid message received from %s:%d.', *self.client_address
                )
                return None

            _handle_received_message(self.client_address, self.port, message)


class UDPHandler(BaseRequestHandler):
    """Handler for syslog messages arriving via UDP."""

    def __init__(self, port: Port, *args, **kwargs) -> None:
        self.port = port
        super().__init__(*args, **kwargs)

    def handle(self) -> None:
        try:
            data = self.request[0]
            message = syslogmp.parse(data)
        except ValueError:
            logger.info(
                'Invalid message received from %s:%d.', *self.client_address
            )
            return None

        _handle_received_message(self.client_address, self.port, message)


def _handle_received_message(
    client_address: tuple[str, int], port: Port, message: SyslogMessage
) -> None:
    logger.debug(
        'Received message from %s:%d on port %s -> %s',
        client_address[0],
        client_address[1],
        format_port(port),
        format_message_for_log(message),
    )

    syslog_message_received.send(
        port, source_address=client_address, message=message
    )


def create_server(port: Port) -> Union[ThreadingTCPServer, ThreadingUDPServer]:
    """Create a threading server to receive syslog messages."""
    address = ('', port.number)

    if port.transport_protocol == TransportProtocol.TCP:
        tcp_handler_class = partial(TCPHandler, port)
        return ThreadingTCPServer(address, tcp_handler_class)
    elif port.transport_protocol == TransportProtocol.UDP:
        udp_handler_class = partial(UDPHandler, port)
        return ThreadingUDPServer(address, udp_handler_class)
    else:
        raise ValueError(f'Unsupported transport protocol')


def start_server(port: Port) -> None:
    """Start a server, in a separate thread."""
    try:
        server = create_server(port)
    except OSError as e:
        sys.stderr.write(f'Error {e.errno:d}: {e.strerror}\n')
        sys.stderr.write(
            f'Cannot open port {format_port(port)}. Could be already in use. '
            f'Or permission is lacking; try a port number above 1,024 (or '
            'even 4,096) and up to 65,535.\n'
        )
        sys.exit(1)

    thread_name = f'{server.__class__.__name__}-port{port}'
    start_thread(server.serve_forever, thread_name)
    logger.info(
        'Listening for syslog messages on %s:%s.',
        server.server_address[0],
        format_port(port),
    )


def start_syslog_message_receivers(ports: Iterable[Port]) -> None:
    """Start one syslog message receiving server for each port."""
    for port in ports:
        start_server(port)


def format_message_for_log(message: SyslogMessage) -> str:
    """Format a syslog message to be logged."""
    return (
        f'facility={message.facility.name}, '
        f'severity={message.severity.name}, '
        f'timestamp={message.timestamp.isoformat()}, '
        f'hostname={message.hostname}, '
        f'message={message.message}'
    )