mchoji/wtg-seal

View on GitHub
src/wtgseal/maker.py

Summary

Maintainability
A
1 hr
Test Coverage
A
100%
"""Functions to generate locust code.

This module contains functions to generate locust code, *i.e.* it
defines tasks, tasksets and users.

"""

from pathlib import Path
from typing import List, Tuple

from . import __version__ as wtgseal_version
from . import dist_name as wtgseal_dist_name
from . import dist_url as wtgseal_dist_url

CmdDef = Tuple[int, str]
BlockDef = List[CmdDef]


def cmddef_to_str(cmddef: CmdDef, indentby: str = ' ' * 4) -> str:
    """Convert a command definition into a string."""
    level, cmd = cmddef
    return indentby * level + cmd + '\n'


def setup_blank_line(n: int = 1, /) -> BlockDef:  # noqa
    """Generate representation for a given number of blank lines."""
    if type(n) != int:
        raise TypeError("n must be an integer")
    else:
        blank = [(0, '')] * n
        return blank


def setup_header(*, dist: str = wtgseal_dist_name,
                 version: str = wtgseal_version,
                 url: str = wtgseal_dist_url) -> BlockDef:
    """Generate a simple header with dist, version and url information.

    Generate a program header citing the distribution name from where
    it was generated, the current version and the url where one can find
    further information.

    """
    header = []
    header.append((0,
                   f'# locust file generated by {dist} (release {version})'))
    header.append((0, f'# See {url} for more information'))
    return header


def setup_import() -> BlockDef:
    """Generate the import lines for a locust file.

    Generate code to import the modules needed for running a locust
    file.

    Returns
    -------
    BlockDef
        A list of code representation, where each item represents a line
        of import.

    """
    imports = []
    imports.append((0, 'from locust import HttpUser, TaskSet, task'))
    imports.append((0, 'from scipy.stats import pareto'))
    return imports


def setup_csv_stats_interval(t: int, /) -> BlockDef:
    """Generate import and settings for CSV stats interval.

    Generate code to import locust.stats module and set the property
    CSV_STATS_INTERVAL_SEC.

    Parameters
    ----------
    t : {int}
        How frequently (in seconds) CSV data are to be written. This
        has only effect when `--csv-full-history` is passed to locust
        run.

    Returns
    -------
    BlockDef
        A list of code representation, where each item represents a line
        of import.

    """
    if not isinstance(t, int):
        raise TypeError('Argument should be an integer')
    if t < 1:
        raise ValueError('Interval should be greater than zero')
    block = []
    block.append((0, 'import locust.stats'))
    block.append((0, f'locust.stats.CSV_STATS_INTERVAL_SEC = {t}'))
    return block


def setup_task(name: str = 'task0',
               uri: List[str] = ["/", ],
               /, *,
               weight: int = 1,
               group_name: str = None,
               indlevel: int = 0) -> BlockDef:
    """Generate code to define a locust task.

    Generate code to define a locust task according to the given
    parameters.

    Parameters
    ----------
    name : {str}, optional
        The name for the task to be generated (the default is 'task0')
    weight : {int}, optional
        The weight for the generated task (the default is 1)
    uri : {List[str] = ["/", ]}
        A list of URIs, each starting with a backslash
        like "/index.html"
    group_name : str
        Group URLs together in Locust's statistics using this name (the
        default is None, which leads to calculate statistics for each
        URL separately)
    indlevel : {int}, optional
        The indentation level where the task definition should begin
        (the default is 0, which leads to code beginning at the left
        margin)

    Returns
    -------
    List[Tuple[int, str]]
        A list where each item represents a line of code to be
        generated. Each Tuple[int, str] consists of the indentation
        level for the code and the code itself. `None` is returned in
        case `uri` is not an iterable.

    """
    if isinstance(uri, list):
        task = []
        task.append((indlevel, f'@task({weight})'))
        task.append((indlevel, f'def {name}(self):'))
        if group_name is None:
            for req in uri:
                task.append((indlevel + 1,
                             f'self.client.get("{req}")'))
        else:
            for req in uri:
                task.append((indlevel + 1,
                             f'self.client.get("{req}", name="{group_name}")'))
        return task
    else:
        raise TypeError('Parameter uri should be a list')


def setup_taskset(name: str = 'MyTaskSet') -> BlockDef:
    """Generate representation of TaskSet subclass definition."""
    return [(0, f'class {name}(TaskSet):')]


def setup_user(name: str = 'MyUser',
               taskset: str = 'MyTaskSet',
               /, *,
               wait_seed: int = 1,
               weight: int = 1,
               indlevel: int = 0) -> BlockDef:
    """Generate a model for user behaviour.

    Generate a representation for a User subclass, which represents an
    user behaviour. The representation contains the class name, taskset,
    and a wait time based on a Pareto distribution.

    Parameters
    ----------
    name : {str}, optional
        The new class name (the default is 'MyUser').
    taskset : {str}, optional
        The tasksets this class will use (the default is 'MyTaskSet').
    wait_seed : {int}, optional
        The seed to be used for the Pareto distribution from where the
        wait times are retrieved (the default is 1).
    weight : {int}, optional
        The weight for this class. The greater this value, the greater
        the chances this class will be spawned. Only important in case
        you have more than one User subclass and with they be spawned
        at different rates.
    indlevel : {int}, optional
        The indentation level where the task definition should begin
        (the default is 0, which leads to code beginning at the left
        margin).

    Returns
    -------
    List[Tuple[int, str]]
        A list where each item represents a line of code to be
        generated. Each Tuple[int, str] consists of the indentation
        level for the code and the code itself.

    Notes
    -----
    An OFF time is defined in this class by the `wait_time` attribute.
    This attribute defines how long a locust will wait after each task.
    Here, we define it as a pareto distribution with shape 1.4 and
    scale 1.0, based on SURGE implementation [1]_.

    References
    ----------
    .. [1] Barford, P., & Crovella, M. (1998, June). Generating
       representative web workloads for network and server performance
       evaluation. In *Proceedings of the 1998 ACM SIGMETRICS joint
       international conference on Measurement and modeling of computer
       systems* (pp. 151-160).

    See Also
    --------
    locust.Locust

    """
    locust = [(indlevel, f'class {name}(HttpUser):'),
              (indlevel + 1, f'weight = {weight}'),
              (indlevel + 1, f'tasks = [{taskset}]'),
              (indlevel + 1, 'pareto_obj = pareto(b=1.4, scale=1)'),
              (indlevel + 1, f'pareto_obj.random_state = {wait_seed}')]
    locust.extend(setup_blank_line())
    locust.extend([(indlevel + 1, 'def wait_time(self):'),
                   (indlevel + 2, 'return self.pareto_obj.rvs()')])
    return locust


def write_locust(path: Path, filedef: BlockDef,
                 indentby: str = ' ' * 4) -> None:
    """Write python code to file.

    Convert blocks of code definition and generate the actual code,
    writing it into a file.

    Parameters
    ----------
    path : {Path}
        The file to be written.
    filedef : {BlockDef}
        The representation of the code to be written.
    indentby : {str}, optional
        The indentation to be adopted (the default is ' ' * 4)

    """
    if not isinstance(path, Path):
        raise TypeError('Expected a pathlib.Path object')
    if not isinstance(filedef, list):
        raise TypeError('Expected a list of tuples')
    else:
        with path.open(mode='w') as fd:
            for cmddef in filedef:
                fd.write(cmddef_to_str(cmddef, indentby))