theia-log/theia

View on GitHub
theia/model.py

Summary

Maintainability
A
0 mins
Test Coverage
"""
-----------
theia.model
-----------

Theia event data model.

Basic model of an Event, serializers and parsers for Event manipulation.
"""

from time import time
from collections import namedtuple
from io import StringIO, SEEK_CUR
import re


EventPreamble = namedtuple('EventPreamble', ['total', 'header', 'content'])
"""A preamble to an :class:`Event`.

The preamble is present only in the serialized representation of an event and
holds the information about the size of the event and its parts.
"""

EventPreamble.total.__doc__ = """
    ``int``, the size of the serialized event in bytes.
"""

EventPreamble.header.__doc__ = """
    ``int``, the size of the serialized event header in bytes.
"""

EventPreamble.content.__doc__ = """
    ``int``, the size of the serialized event content in bytes.
"""


class Header:
    """Header represents an Event header. The header contains the following
    properties:

    * ``id``, unique identifier for the event. Usually UUIDv4.
    * ``timestamp``, floating point of the number of milliseconds since epoch
      start (1970-1-1T00:00:00.00).
    * ``source``, string, the name of the event source.
    * ``tags``, list of strings, arbitrary tags attached to the event.

    The header is usefull and usually used when serializing/parsing an event.
    """

    def __init__(self, id=None, timestamp=None, source=None, tags=None):
        self.id = id
        self.timestamp = timestamp
        self.source = source
        self.tags = tags


class Event:
    """Event represnts some event occuring at a specfic time.

    Each event is uniquely identified by its ``id`` in the whole system. An event
    comes from a ``source`` and always has an associated ``timestamp``. The
    timestamp is usually generated by the event producer.

    The *content* of an event is an arbitrary string. It may be a log file line,
    some generated record, readings from a sensor or other non-structured or
    structured text.

    Each event may have a list of ``tags`` associcated with it. These are arbitrary
    strings and help in filtering the events.

    An event may look like this ::

        id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd
        timestamp: 1509989630.6749051
        source:/dev/sensors/door1-sensor
        tags:sensors,home,doors,door1
        Door has been unlocked.

    The constructor takes multiple arguments, of which only the id and source are
    required.

    :param id: ``str``, the event unique identifier. Must be system-wide unique. An
        UUID version 4 (random UUID) would be a good choice for ``id``.
    :param source: ``str``, the source of the event. It usually is the name of the
        monitored file, but if the event does not originate from a file, it should be
        set to the name of the process, system or entity that generated the event.
    :param timestamp: ``float``, time when the event occured in seconds (like UNIX
        time). The value is a floating point number with nanoseconds precission. If
        no value is given, then the current time will be used.
    :param tags: ``list``, list of ``str`` tags to add to this event.
    :param content: ``str``, the actual content of the event. The content may have
        an arbitrary lenght (or none at all).

    """

    def __init__(self, id, source, timestamp=None, tags=None, content=None):
        self.id = id
        self.source = source
        self.timestamp = timestamp or time()  # time in nanoseconds UTC
        self.tags = tags or []
        self.content = content or ''

    def match(self, id=None, source=None, start=None, end=None, content=None, tags=None):
        """Check if this event matches the provided criteria.

        The event will match only if **all** criteria is statisfied. Calling match
        without any criteria, yields ``True``.

        The criteria is processed as a regular expression. Each value is first
        converted to string, then matched against the provided regular expression -
        see :func:`re.match`. The exception of this rule are the criteria for
        ``start`` and ``end`` wich expect numeric values, as they operate on the
        :class:`Event` timestamp.

        :param id: ``str``, regular expression against which to match the :class:`Event`
            ``id``.
        :param source: ``str``, regular expression against which to match the :class:`Event`
            ``source``.
        :param start: ``float`` or ``int``, match true if the :class:`Event` timestamp
            is greater than or equal to this value.
        :param start: ``float`` or ``int``, match true if the :class:`Event` timestamp
            is less than or equal to this value.
        :param content: ``str``, regular expression against which to match the :class:`Event`
            ``content``.
        :param tags: ``list``, list of ``str`` regular expressions against which to
            match the :class:`Event` tags. Matches true only if **all** of the provided
            criteria match the Event tags.

        :returns: ``True`` if this :class:`Event` matches the criteria, otherwise ``False``.
        """
        return all([self._match_header_id_and_source(id, source),
                    self._match_timestamp(start, end),
                    self._match_tags(tags),
                    self._match_content(content)])

    def _match_header_id_and_source(self, id, source):
        matches = True
        if id is not None:
            matches = _match(id, self.id)
        if matches and source is not None:
            matches = _match(source, self.source)
        return matches

    def _match_timestamp(self, start, end):
        matches = True
        if self.timestamp:
            if start is not None:
                matches = self.timestamp >= start
            if matches and end is not None:
                matches = self.timestamp <= end
        return matches

    def _match_tags(self, tags):
        if tags:
            for tag in tags:
                if tag not in self.tags:
                    return False
        return True

    def _match_content(self, content):
        if not content:
            return True
        return _match(content, self.content)


def _match(pattern, value):
    """Match the value against a regular expression pattern.
    """
    if value is None:
        return False
    return re.match(pattern, value) is not None


class EventSerializer:
    """Serialized for instances of type :class:`Event`.

    This serializes the :class:`Event` in a plain text representation of the Event.
    The serialized text is encoded in UTF-8 and the actual ``bytes`` are returned.

    The representation consists of three parts: preamble, header and content.
    The preamble is the first line of every event and has this format:

        event: <total_size> <header_size> <content_size>

    where:

    * ``total_size`` is the total size of the Event (after the heading) in bytes.
    * ``header_size`` is the size of the header in bytes.
    * ``content_size`` is the size of the content (after the Header) in bytes.

    The header holds the values for the Event's id, source, tags and timestamp.
    Each value is serialized on a single line. The line starts with the name of
    the property, separated by a colon(``:``), then the property value.

    The content starts after the final header and is separated by a newline.

    Here is an example of a fully serialized :class:`Event` (Python's ``bytes``)::

        b'event: 155 133 22\\nid:331c531d-6eb4-4fb5-84d3-ea6937b01fdd\\ntimestamp: 1509989630.6749051\\nsource:/dev/sensors/door1-sensor\\ntags:sensors,home,doors,door1\\nDoor has been unlocked\\n'

    or as a textual representation::

        event: 155 133 22
        id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd
        timestamp: 1509989630.6749051
        source:/dev/sensors/door1-sensor
        tags:sensors,home,doors,door1
        Door has been unlocked

    **Note** that the :class:`EventSerializer` adds a trailing newline (``\\n``) at
    the end.

    The serializer constructor takes the encoding as an argument. By default "utf-8"
    is used.

    :param encoding: ``str``, the encoding of the serialized string. Default ``utf-8``.

    """
    def __init__(self, encoding='utf-8'):
        self.encoding = encoding

    def serialize(self, event):
        """Serializes an :class:`Event`.

        See :class:`EventSerializer` for details on the serialization format.

        :param event: :class:`Event`, the event to be serialized.

        :returns: the serialized event as ``bytes``.
        """
        event_str = ''
        hdr = self._serialize_header(event)
        hdr_size = len(hdr.encode(self.encoding))
        cnt = event.content or ''
        cnt_size = len(cnt.encode(self.encoding))
        total_size = hdr_size + cnt_size
        event_str += 'event: %d %d %d\n' % (total_size, hdr_size, cnt_size)
        event_str += hdr
        event_str += cnt
        event_str += '\n'
        return event_str.encode(self.encoding)

    def _serialize_header(self, event):
        hdr = ''
        hdr += 'id:' + str(event.id) + '\n'
        hdr += 'timestamp: %.7f' % event.timestamp + '\n'
        hdr += 'source:' + str(event.source) + '\n'
        hdr += 'tags:' + ','.join(event.tags) + '\n'
        return hdr


_HEADER_FIELDS = {
    'id': lambda value: value,
    'timestamp': lambda value: float(value),
    'source': lambda value: value,
    'tags': lambda value: value.split(',')
}


class EventParser:
    """Parses an incoming bytes stream into an :class:`Event`.

    Offers methods for parsing parts of an :class:`Event` or parsing the full
    event from the incoming :class:`io.BytesIO` stream.

    The stream will be decoded before converting it to ``str``. By default the
    parser assumes that the stream is ``utf-8`` encoded.

    :param encoding: ``str``, the encoding to be ued for decoding the stream bytes.
        The default is ``utf-8``.

    """
    def __init__(self, encoding='utf-8'):
        self.encoding = encoding

    def parse_header(self, hdr_size, stream):
        """Parses the :class:`Event` header into a :class:`Header` object from the
        incoming stream.

        First ``hdr_size`` bytes are read from the :class:`io.BytesIO` stream and
        are decoded to ``str``.

        Then, the parser parses each line by splitting it by the first colon (``:``).
        The first part is ued to determine the :class:`Header` property. The part
        after the colon is the propery value.

        :param hdr_size: ``int``, the size of the header in bytes. See
            :meth:`EventParser.parse_preamble` on how to determine the header size
            in bytes.
        :param stream: :class:`io.BytesIO`, the incoming stream to parse.

        :returns: the parsed :class:`Header` for the event.

        Raises :class:`Exception` if an unknown property is encountered in the header.
        """
        hbytes = stream.read(hdr_size)
        if len(hbytes) != hdr_size:
            raise Exception('Invalid read size from buffer. The stream is either unreadable \
                             or corrupted. %d read, expected %d' % (len(hbytes), hdr_size))
        hdr_str = hbytes.decode(self.encoding)
        header = Header()
        sio = StringIO(hdr_str)

        line = sio.readline()
        while line:
            line = line.strip()
            if not line:
                raise Exception('Invalid header')
            idx = line.index(':')
            prop = line[0:idx].lower()
            value = line[idx + 1:]
            parse_field = _HEADER_FIELDS.get(prop)
            if not parse_field:
                raise Exception('Unknown property in header %s' % prop)
            setattr(header, prop, parse_field(value))
            line = sio.readline()
        sio.close()
        return header

    def parse_preamble(self, stream):
        """Parses the event preamble from the incoming stream into a :class:`EventPreamble`.

        The event preamble is a single line read from the stream with the following
        structure::

            <total_size> <header_size> <content_size>

        where:

        * ``total_size`` is the total size of the Event (after the heading) in bytes.
        * ``header_size`` is the size of the header in bytes.
        * ``content_size`` is the size of the content (after the Header) in bytes.

        Note that the sizes are expressed in bytes.

        :param stream: :class:`io.BytesIO`, the stream to parse.

        :returns: the parsed :class:`EventPreamble` from the incoming stream.
        """
        pstr = stream.readline()
        if not pstr:
            raise EOFException()
        if pstr:
            pstr = pstr.decode(self.encoding).strip()

        if not pstr.startswith('event:'):
            raise Exception('Invalid preamble line: [%s]' % pstr)

        values = pstr[len('event:') + 1:].split(' ')
        if len(values) != 3:
            raise Exception('Invalid preamble values')

        return EventPreamble(total=int(values[0]), header=int(values[1]), content=int(values[2]))

    def parse_event(self, stream, skip_content=False):
        """Parses an event from the incoming stream.

        The parsing is done in two phases:

        1. The preamble is parsed to determine the total size of the event and the
            the size of the event header.
        2. Then the actual event is read, either with or without the event content.

        If ``skip_content`` is set to ``True``, then the actual content of the event
        is not read. This is usefull in event readers that do matching of the event
        header values, without wasting memory and performance for reading the content.
        In this case, the event content will be set to ``None``.

        :param stream: :class:`io.BytesIO`, the stream to parse from.
        :param skip_content: ``bool``, whether to skip the fetching of the content and
            to fetch only the :class:`Event` header. Default is ``False``.

        :returns: the parsed :class:`Event` from the incoming stream.
        """
        preamble = self.parse_preamble(stream)
        header = self.parse_header(preamble.header, stream)
        content = None
        read_length = None
        if skip_content:
            stream.seek(preamble.content, SEEK_CUR)
        else:
            content = stream.read(preamble.content)
            read_length = len(content)
            content = content.decode(self.encoding)

        stream.seek(1, SEEK_CUR)  # new line after each event

        if not skip_content and read_length != preamble.content:
            raise Exception('Invalid content size. The stream is either unreadable or corrupted. ' +
                            'Preamble declares %d bytes, but content length is %d' % (preamble.content, len(content)))

        return Event(id=header.id, source=header.source, timestamp=header.timestamp,
                     tags=header.tags, content=content)


class EOFException(Exception):
    """Represents an error in parsing an :class:`Event` from an underlyign stream
    that occurs when the end of stream is reached prematurely.
    """
    pass