avocado-framework/avocado

View on GitHub
optional_plugins/robot/avocado_robot/runner.py

Summary

Maintainability
A
0 mins
Test Coverage
F
32%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2019
# Authors: Cleber Rosa <crosa@redhat.com>

"""
Avocado nrunner for Robot Framework tests
"""

import io
import multiprocessing
import tempfile

from robot import run

from avocado.core.nrunner.app import BaseRunnerApp
from avocado.core.nrunner.runner import BaseRunner
from avocado.core.utils import messages


class RobotRunner(BaseRunner):

    name = "robot"
    description = "Runner for Robot Framework tests"

    def _uri_to_file_suite_test(self):
        """Converts the uri to a file name, suit name and test name"""
        if not self.runnable.uri:
            return (None, None, None)

        file_name_suite_test = self.runnable.uri.split(":", 1)
        if len(file_name_suite_test) == 1:
            return (file_name_suite_test[0], None, None)

        file_name, suite_test = file_name_suite_test
        suite_name_test_name = suite_test.split(".", 1)
        if len(suite_name_test_name) == 1:
            return (file_name, suite_test, None)
        suite_name, test_name = suite_name_test_name

        return (file_name, suite_name, test_name)

    def _run(self, file_name, suite_name, test_name, queue):
        stdout = io.StringIO()
        stderr = io.StringIO()
        output_dir = tempfile.mkdtemp(prefix=".avocado-robot")
        native_robot_result = run(
            file_name,
            suite=suite_name,
            test=test_name,
            outputdir=output_dir,
            stdout=stdout,
            stderr=stderr,
        )
        if native_robot_result:
            result = "fail"
        else:
            result = "pass"

        stdout.seek(0)
        stderr.seek(0)
        output = self.prepare_status(
            "finished",
            {
                "result": result,
                "stdout": stdout.read().encode(),
                "stderr": stderr.read().encode(),
            },
        )
        stdout.close()
        stderr.close()
        queue.put(output)

    def run(self, runnable):
        # pylint: disable=W0201
        self.runnable = runnable
        file_name, suite_name, test_name = self._uri_to_file_suite_test()
        if not all([file_name, suite_name, test_name]):

            yield messages.FinishedMessage.get("error", fail_reason="Invalid URI given")
            return

        queue = multiprocessing.SimpleQueue()
        process = multiprocessing.Process(
            target=self._run, args=(file_name, suite_name, test_name, queue)
        )
        process.start()
        yield messages.StartedMessage.get()
        yield from self.running_loop(lambda: not queue.empty())

        status = queue.get()
        yield messages.StdoutMessage.get(status["stdout"])
        yield messages.StderrMessage.get(status["stderr"])
        yield messages.FinishedMessage.get(status["result"])


class RunnerApp(BaseRunnerApp):
    PROG_NAME = "avocado-runner-robot"
    PROG_DESCRIPTION = "*nrunner application for robot tests"
    RUNNABLE_KINDS_CAPABLE = ["robot"]


def main():
    app = RunnerApp(print)
    app.run()


if __name__ == "__main__":
    main()