then/components/command.py

Summary

Maintainability
A
25 mins
Test Coverage
import os
import random
import subprocess
from pathlib import Path
from typing import Union

from dataclasses import dataclass

from then.components.base import Component, Message
from then.exceptions import ExecuteError, ValidationError, ConfigError

EXECUTE_SHELL_PARAM = '-c'


"""Command component service
"""



def get_shell(name='bash'):
    """Absolute path to command

    :param str name: command
    :return: command args
    :rtype: list
    """
    if name.startswith('/'):
        return [name]
    return ['/usr/bin/env', name]


def get_execute_command(cmd, shell='bash'):
    if not isinstance(cmd, (tuple, list)):
        cmd = [cmd]
    return get_shell(shell) + [EXECUTE_SHELL_PARAM, ' '.join(cmd)]


def run_as_cmd(cmd, user, shell=None):
    """Get the arguments to execute a command as a user

    :param str cmd: command to execute
    :param user: User for use
    :param shell: Bash, zsh, etc.
    :return: arguments
    :rtype: list
    """
    shell = shell or 'bash'
    if not user:
        return get_execute_command(cmd, shell)
    return ['sudo', '-s', '--set-home', '-u', user] + get_execute_command(cmd, shell)


def execute_cmd(cmd, cwd=None, timeout=5):
    """Excecute command on thread

    :param cmd: Command to execute
    :param cwd: current working directory
    :return: None
    """
    p = subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    try:
        p.wait(timeout=timeout)
    except subprocess.TimeoutExpired:
        return None
    else:
        stdout, stderr = p.stdout.read(), p.stderr.read()
        stdout, stderr = stdout.decode('utf-8', errors='ignore'), stderr.decode('utf-8', errors='ignore')
        if p.returncode:
            raise ExecuteError('Error running command {}: The error code {} has returned. Stderr: {}'.format(
                ' '.join(cmd), p.returncode, stderr
            ))
        else:
            return stdout, stderr


def execute_over_ssh(cmd, ssh, cwd=None, shell='bash'):
    """Excecute command on remote machine using SSH

    :param cmd: Command to execute
    :param ssh: Server to connect. Port is optional
    :param cwd: current working directory
    :return: None
    """
    port = None
    parts = ssh.split(':', 1)
    if len(parts) > 1:
        port = parts[1]
    quoted_cmd = ' '.join([x.replace("'", """'"'"'""") for x in cmd.split(' ')])
    remote_cmd = ' '.join([
        ' '.join(get_shell(shell)), # /usr/bin/env bash
        ' '.join([EXECUTE_SHELL_PARAM, "'", ' '.join((['cd', cwd, ';'] if cwd else []) + [quoted_cmd]), "'"])],
    )
    return ['ssh', parts[0]] + (['-p', port] if port else []) + ['-C'] + [remote_cmd]


class CommandMessageBase(Message):
    cmd: Union[str, list] = None
    component: 'Command' = None

    def send(self):
        if self.component.ssh:
            cmd = execute_over_ssh(self.get_cmd(), self.component.ssh, self.component.cwd)
            output = execute_cmd(cmd)
        else:
            cmd = run_as_cmd(self.get_cmd(), self.component.user)
            output = execute_cmd(cmd, self.component.cwd)
        if output:
            return output[0]

    def get_cmd(self):
        return self.cmd


@dataclass
class CommandMessage(CommandMessageBase):
    """:class:`CommandMessage` instance created by :class:`Command` component. Create It using::

        from then.components import Command

        message = Command().message(cmd=['ls', '-l'])
        message.send()

    :arg cmd: System Command to execute. List or string
    """
    cmd: Union[str, list]
    component: 'Command' = None


class CommandBase(Component):
    user: str = None
    cwd: str = None
    ssh: str = None

    def __post_init__(self):
        parts = (self.ssh or '').split(':', 1)
        if len(parts) > 1 and not parts[1].isdigit():
            raise ValidationError('Invalid port number on ssh config: {}'.format(parts[1]))


@dataclass
class Command(CommandBase):
    """Create a Command instance to execute a system command::

        from then.components import Command

        Command(user='myuser', cwd='/home/myuser/Desktop')\\
            .send(cmd='ls -l')

    :param user: System user to use. Only available on local system
    :param cwd: Current directory
    :param ssh: Execute command over ssh. Syntax: ``<user>@<machine>[:<port>]``
    """
    user: str = None
    cwd: str = None
    ssh: str = None

    _message_class = CommandMessage


class PathBase(Component):
    action: str = 'ordered'
    pattern: str = '*'
    on_end: str = 'repeat'
    _actions = ['ordered', 'shuffle']
    _on_ends = ['stop', 'repeat']

    def __post_init__(self):
        self._action = self.get_action()
        self._on_end = self.get_on_end()

    def _availables(self, value, name, availables):
        new_action = value.lower()
        if new_action not in availables:
            raise ConfigError('Invalid {} in {}: {}. Availables: {}'.format(
                name, self.__class__.__name__, value, ', '.join(availables)
            ))
        return new_action

    def get_action(self):
        return self._availables(self.action, 'action', self._actions)

    def get_on_end(self):
        return self._availables(self.on_end, 'on_end', self._on_ends)


class PathMessageBase(Message):
    path: str
    component: PathBase = None
    _files = None

    def get_files(self):
        if not os.path.lexists(self.path):
            raise ConfigError('{} path does not exists.'.format(self.path))
        if os.path.isfile(self.path):
            return [self.path]
        elif self.component._action == 'ordered':
            return sorted(self.list_directory())
        elif self.component._action == 'shuffle':
            files = self.list_directory()
            random.shuffle(files)
            return files


    def get_next(self, on_end=None):
        if self._files is None:
            self._files = self.get_files()
        on_end = on_end or self.component._on_end
        try:
            return self._files.pop(0)
        except IndexError:
            if on_end == 'stop':
                raise StopIteration
            elif on_end == 'repeat':
                self._files = None
                return self.get_next('stop')

    def list_directory(self):
        return [str(path.resolve()) for path
                in Path(self.path).glob(self.component.pattern) if path.is_file()]