theia-log/theia

View on GitHub
theia/cli/simulate.py

Summary

Maintainability
A
0 mins
Test Coverage
"""
------------------
theia.cli.simulate
------------------

Event simulation tool.
"""


import asyncio
import time
from random import randint, shuffle
from uuid import uuid4
from datetime import datetime
from threading import Thread
import argparse

import lorem

from theia.comm import Client
from theia.model import Event, EventSerializer


def get_parser():
    """Configure and instantiate the argument parser for the simulation tool.

    :returns: configured :class:`argparse.ArgumentParser`.
    """
    parser = argparse.ArgumentParser(prog='theia.cli.simulate',
                                     description='Simulate events (debugging)')

    parser.add_argument('-H', '--host', default='localhost', dest='host', help='Collector host')
    parser.add_argument('-p', '--port', default=6433, dest='port', help='Collector port', type=int)
    parser.add_argument('-t', nargs='*', dest='tags', help='Set of tags to choose from')
    parser.add_argument('-s', nargs='*', dest='sources', help='Set of event sources to choose from')
    parser.add_argument('-c', dest='content', default=None,
                        help='Use this event content instead of random content.')
    parser.add_argument('--content-size', dest='content_size', type=int,
                        default=None, help='Size of content (approximately)')
    parser.add_argument('--delay', dest='delay', default=1.0, type=float,
                        help='Delay between event in seconds')

    return parser


def generate_content(content=None, size=None):
    """Generate random  (lorem-ipsum style) content.

    If ``content`` is provided, just passes through. Otherwise generates 'lorem-ipsum'
    style random content that is of about the provided ``size``. If no size is given,
    then it returns just one sentence.

    :param str content: optional, if given, the content to be returned.
    :param int size: optional, the size of the generated content. If not provided,
        then only one sentence is returned. If provided, then generates sentences
        with total size >= ``size``. Always generates full sentences.

    :returns: ``str``, the generated content.
    """
    if content is not None:
        return content

    if size is None:
        return lorem.sentence()

    content = lorem.sentence()

    while len(content) < size:
        content = content + ' ' + lorem.sentence()

    return content


def generate_rand_items(items, default):
    """Generates a random subset (``list``) of the provided list of items.

    The size of the subset is at least one, and at most is the whole ``items``
    list.
    If no items provided, then returns a list of just one item - the ``default``
    one.

    The order of the items in the subset is randomized as well.

    :param list items: the list of items to choose from.
    :param default: the default item to choose if no list of items is provided.

    :returns: :class:`list` randomized subset of the items list.
    """
    if not items:
        return [default]

    if len(items) == 1:
        return items

    rnd_size = randint(1, len(items))

    rndi = [n for n in items]
    shuffle(rndi)

    return rndi[0:rnd_size]


def generate_rand_event(sources, tags, content, cnt_size):
    """Generate random event from the given choices for sources, tags and content.

    :param list sources: list of choices for sources. One random source will be
        chosen from the list.
    :param list tags: list of choices for tags. Random subset with random order
        and size greater than one will be chosen from the given list of tags.
    :param str content: if not ``None``, that content will be used. If ``None``,
        random content will be generated.
    :param int cnt_size: generate content of at least this size. The size of the
        generated content may be grater (the content always contains full sentences)
        but it will never be less than ``cnt_size``.

    :returns: a random :class:`theia.model.Event` generated from the given choices
        values.
    """
    source = sources[randint(0, len(sources) - 1)]
    return Event(id=str(uuid4()), source=source, timestamp=datetime.now().timestamp(),
                 tags=generate_rand_items(tags, 'tag-4'),
                 content=generate_content(content, cnt_size))


def _send_event(args, tags, client, serializer):
    """Generate and send a single event.
    """
    event = generate_rand_event(args.sources, tags, args.content, args.content_size)
    client.send_event(event)
    print(serializer.serialize(event).decode('UTF-8'))
    print()


def simulate_events(args):
    """Connects and generates random events.

    The events are generated according to the given arguments.

    :param argparse.Namespace args: the parsed arguments passed to the program.

    """
    loop = asyncio.get_event_loop()
    client = Client(host=args.host, port=args.port, path='/event', loop=loop)

    client.connect()

    ser = EventSerializer()
    tags = [args.tags] if isinstance(args.tags, str) else args.tags



    class _SenderThread(Thread):
        """Sender Thread. Runs continuously.
        """

        def __init__(self):
            super(_SenderThread, self).__init__()
            self.is_running = False

        def run(self):
            """Generate and send events continuously.
            """

            self.is_running = True
            while self.is_running:
                _send_event(args, tags, client, ser)
                time.sleep(args.delay)

    sender_thread = _SenderThread()
    sender_thread.is_running = True
    sender_thread.start()

    loop.run_forever()


def run_simulate_events():
    """Run the simulation of events.
    """
    parser = get_parser()

    args = parser.parse_args()

    simulate_events(args)



if __name__ == '__main__':
    run_simulate_events()