ArturSpirin/test_junkie

View on GitHub
test_junkie/metrics.py

Summary

Maintainability
A
3 hrs
Test Coverage
import errno
import multiprocessing
import os
import sys
import threading
import time
import traceback
from datetime import datetime

import pkg_resources

from test_junkie.cli.cli import CliUtils

from test_junkie.cli.cli_config import Config
from test_junkie.debugger import LogJunkie
from test_junkie.decorators import DecoratorType
from test_junkie.constants import SuiteCategory, TestCategory, DocumentationLinks


class ClassMetrics(object):

    def __init__(self):

        self.__stats = {"status": None, "retry": 0, "runtime": 0, "start": None, "end": None,
                        DecoratorType.BEFORE_CLASS: {"performance": [], "exceptions": [], "tracebacks": []},
                        DecoratorType.BEFORE_TEST: {"performance": [], "exceptions": [], "tracebacks": []},
                        DecoratorType.AFTER_TEST: {"performance": [], "exceptions": [], "tracebacks": []},
                        DecoratorType.AFTER_CLASS: {"performance": [], "exceptions": [], "tracebacks": []}}

    def __copy__(self):
        return self

    def __deepcopy__(self, memo):
        return self

    def update_decorator_metrics(self, decorator, start_time, exception=None, trace=None):
        from test_junkie.objects import Limiter
        self.__stats[decorator]["performance"].append(time.time() - start_time)
        self.__stats[decorator]["exceptions"].append(Limiter.parse_exception_object(exception))
        self.__stats[decorator]["tracebacks"].append(Limiter.parse_traceback(trace))

    def update_suite_metrics(self, status, start_time, initiation_error=None):

        self.__stats["status"] = status
        if status not in [SuiteCategory.CANCEL, SuiteCategory.SKIP, SuiteCategory.IGNORE]:
            self.__stats["retry"] += 1
        self.__stats["start"] = start_time
        self.__stats["end"] = time.time()
        self.__stats["runtime"] = self.__stats["end"] - self.__stats["start"]
        if status in [SuiteCategory.IGNORE] and initiation_error is not None:
            self.__stats.update({"initiation_error": initiation_error})

    def get_metrics(self):

        return self.__stats

    def __get_average_metric(self, decorator, metric):

        from statistics import mean
        performance = self.get_metrics().get(decorator, {}).get(metric, None)
        return mean(performance) if performance else None

    def get_average_performance_of_after_class(self):
        return self.__get_average_metric(DecoratorType.AFTER_CLASS, "performance")

    def get_average_performance_of_before_class(self):
        return self.__get_average_metric(DecoratorType.BEFORE_CLASS, "performance")

    def get_average_performance_of_after_test(self):
        return self.__get_average_metric(DecoratorType.AFTER_TEST, "performance")

    def get_average_performance_of_before_test(self):
        return self.__get_average_metric(DecoratorType.BEFORE_TEST, "performance")


class TestMetrics(object):

    def __init__(self):

        self.__stats = {}

    def __copy__(self):
        return self

    def __deepcopy__(self, memo):
        return self

    def update_metrics(self, status, start_time, param=None, class_param=None, exception=None,
                       formatted_traceback=None, runtime=None, decorator=None):

        def __get_template():

            return {"status": None,
                    "retry": 0,
                    "performance": [],
                    "exceptions": [],
                    "tracebacks": [],
                    "statuses": [],
                    DecoratorType.BEFORE_TEST: {"performance": [], "exceptions": [], "tracebacks": []},
                    DecoratorType.AFTER_TEST: {"performance": [], "exceptions": [], "tracebacks": []}}

        runtime = runtime if runtime is not None else time.time() - start_time
        string_param = str(param)
        string_class_param = str(class_param)
        if string_class_param not in self.__stats:
            self.__stats.update({string_class_param: {string_param: __get_template()}})
        elif string_param not in self.__stats[string_class_param]:
            self.__stats[string_class_param].update({string_param: __get_template()})

        from test_junkie.objects import Limiter
        if decorator is not None:
            self.__stats[string_class_param][string_param][decorator]["performance"].append(time.time() - start_time)
            self.__stats[string_class_param][string_param][decorator]["exceptions"]\
                .append(Limiter.parse_exception_object(exception))
            self.__stats[string_class_param][string_param][decorator]["tracebacks"]\
                .append(Limiter.parse_traceback(formatted_traceback))

            if self.__stats[string_class_param][string_param]["status"] is None:
                # while status is not set, will use one that is currently passed in
                # Once its explicitly updated for the test decorator, then we wont change it
                self.__stats[string_class_param][string_param]["status"] = status
        else:
            self.__stats[string_class_param][string_param]["performance"].append(runtime)
            self.__stats[string_class_param][string_param]["exceptions"]\
                .append(Limiter.parse_exception_object(exception))
            self.__stats[string_class_param][string_param]["tracebacks"]\
                .append(Limiter.parse_traceback(formatted_traceback))
            self.__stats[string_class_param][string_param]["retry"] += 1
            self.__stats[string_class_param][string_param]["status"] = status
            self.__stats[string_class_param][string_param]["param"] = param
            self.__stats[string_class_param][string_param]["class_param"] = class_param
            self.__stats[string_class_param][string_param]["statuses"].append(status)

    def get_metrics(self):

        return self.__stats


class Aggregator(object):

    def __init__(self, executed_suites):

        self.__executed_suites = executed_suites

    @property
    def executed_suites(self):
        return self.__executed_suites

    @staticmethod
    def percentage(total, part):
        if part > 0:
            return "{:0.2f}".format(float(part) / float(total) * 100)
        else:
            return "0"

    def get_basic_report(self):

        def get_template():

            return {"total": 0,
                    TestCategory.SUCCESS: 0,
                    TestCategory.FAIL: 0,
                    TestCategory.ERROR: 0,
                    TestCategory.IGNORE: 0,
                    TestCategory.SKIP: 0,
                    TestCategory.CANCEL: 0}

        report = {"tests": get_template(),
                  "suites": {}}

        for suite in self.__executed_suites:
            if suite not in report["suites"]:
                report["suites"].update({suite: get_template()})
            for test in suite.get_test_objects():
                test_metrics = test.metrics.get_metrics()
                for class_param, class_param_data in test_metrics.items():
                    for param, param_data in class_param_data.items():
                        report["tests"]["total"] += 1
                        report["tests"][param_data["status"]] += 1
                        report["suites"][suite]["total"] += 1
                        report["suites"][suite][param_data["status"]] += 1
        return report

    def get_report_by_features(self):

        report = {}
        for suite in self.__executed_suites:
            feature = suite.get_feature()
            if feature not in report:
                report.update({feature: {"_totals_": Aggregator.get_template()}})
            for test in suite.get_test_objects():
                component = test.get_component()
                if component not in report[feature]:
                    report[feature].update({component: Aggregator.get_template()})
                test_metrics = test.metrics.get_metrics()
                Aggregator._update_report(report, test_metrics, feature, component)
        return report

    def get_report_by_tags(self):

        report = {}
        for suite in self.__executed_suites:
            tag_metrics = suite.get_data_by_tags()
            for metric, metric_data in tag_metrics.items():
                if metric == "_totals_":
                    continue
                metric = metric if metric is not None else "Not Defined"
                if metric not in report:
                    report.update({metric: Aggregator.get_template()})
                for entry in metric_data["performance"]:
                    report[metric]["performance"].append(entry)
                for entry in metric_data["exceptions"]:
                    if entry is not None:
                        report[metric]["exceptions"].append(entry)
                for retry in metric_data["retries"]:
                    report[metric]["retries"].append(retry)
                report[metric]["total"] += metric_data["total"]
                for status in TestCategory.ALL:
                    report[metric][status] += metric_data[status]
        return report

    def get_report_by_owner(self):

        report = {"_totals_": Aggregator.get_template()}
        for suite in self.__executed_suites:
            for test in suite.get_test_objects():
                owner = test.get_owner()
                if owner not in report:
                    report.update({owner: Aggregator.get_template()})
                test_metrics = test.metrics.get_metrics()
                Aggregator._update_report(report, test_metrics, owner)
        return report

    def get_report_by_suite(self):

        report = {"_totals_": Aggregator.get_template()}
        for suite in self.__executed_suites:
            suite_name = suite.get_class_name()
            for test in suite.get_test_objects():
                if suite_name not in report:
                    report.update({suite_name: Aggregator.get_template()})
                test_metrics = test.metrics.get_metrics()
                Aggregator._update_report(report, test_metrics, suite_name)
        return report

    @staticmethod
    def _update_report(report, metrics, category, subcategory=0):

        for class_param, class_param_data in metrics.items():
            for param, data in class_param_data.items():
                for entry in data["performance"]:
                    if subcategory == 0:
                        report[category]["performance"].append(entry)
                        report["_totals_"]["performance"].append(entry)
                    else:
                        report[category][subcategory]["performance"].append(entry)
                        report[category]["_totals_"]["performance"].append(entry)
                for entry in data["exceptions"]:
                    if entry is not None:
                        if subcategory == 0:
                            report[category]["exceptions"].append(entry)
                            report["_totals_"]["exceptions"].append(entry)
                        else:
                            report[category][subcategory]["exceptions"].append(entry)
                            report[category]["_totals_"]["exceptions"].append(entry)
                if subcategory == 0:
                    report[category]["retries"].append(data["retry"])
                    report["_totals_"]["retries"].append(data["retry"])
                    report[category]["total"] += 1
                    report["_totals_"]["total"] += 1
                    report[category][data["status"]] += 1
                    report["_totals_"][data["status"]] += 1
                else:
                    report[category][subcategory]["retries"].append(data["retry"])
                    report[category]["_totals_"]["retries"].append(data["retry"])
                    report[category][subcategory]["total"] += 1
                    report[category]["_totals_"]["total"] += 1
                    report[category][subcategory][data["status"]] += 1
                    report[category]["_totals_"][data["status"]] += 1

        return report

    @staticmethod
    def present_console_output(aggregator):

        def parse_exception(value):
            if value is not None:
                error = ""
                for line in value.split("\n"):
                    error += "\n\t\t  {}".format(line)
                return error

        report = aggregator.get_basic_report()
        test_report = report["tests"]
        suite_report = report["suites"]
        for status in TestCategory.ALL:
            value = "[{part}/{total} {percent}%] {status}".format(part=test_report[status],
                                                                  total=test_report["total"],
                                                                  status=status.upper(),
                                                                  percent=Aggregator.percentage(test_report["total"],
                                                                                                test_report[status]))
            if test_report[status]:
                value = CliUtils.format_bold_string(value)
            print(value)
        print("")
        for suite, stats in suite_report.items():
            status = suite.metrics.get_metrics()["status"]
            if status is None:  # this means that something went wrong with custom event processing
                status = "*"+SuiteCategory.ERROR
            print(">> [{status}] [{passed}/{total} {rate}%] [{runtime:0.2f}s] {module}.{name}"
                  .format(module=CliUtils.format_bold_string(suite.get_class_module()),
                          name=CliUtils.format_bold_string(suite.get_class_name()),
                          status=CliUtils.format_bold_string(status.upper()),
                          runtime=suite.get_runtime(),
                          rate=Aggregator.percentage(stats["total"], stats[TestCategory.SUCCESS]),
                          passed=stats[TestCategory.SUCCESS],
                          total=stats["total"]))
            if status == SuiteCategory.IGNORE:
                print("\t|__ reason: {error}".format(error=suite.metrics.get_metrics().get("initiation_error", None)))
            if status != SuiteCategory.SUCCESS:
                tests = suite.get_unsuccessful_tests()
                for test in tests:
                    test_metrics = test.metrics.get_metrics()
                    print("\t|__ test: {name}()".format(name=CliUtils.format_bold_string(test.get_function_name())))
                    for class_param, class_param_data in test_metrics.items():
                        if class_param != "None":
                            print("\t  |__ class parameter: {class_parameter}".format(class_parameter=class_param))
                        for param, param_data in class_param_data.items():
                            if param != "None":
                                print("\t    |__ parameter: {parameter}".format(parameter=param))
                            for index in range(param_data["retry"]):
                                trace = param_data["tracebacks"][index]
                                if trace is not None:
                                    try:
                                        trace = ":: Traceback: {}".format(
                                            CliUtils.format_color_string(
                                                parse_exception(
                                                    trace.encode('utf8', errors="replace")
                                                ), "red"
                                            )
                                        )
                                    except:
                                        trace = ":: Traceback: {}".format(
                                            CliUtils.format_color_string(
                                                parse_exception(
                                                    trace.encode("unicode_escape").decode("utf8")
                                                ), "red"
                                            )
                                        )
                                else:
                                    trace = ""
                                print("\t      |__ run #{num} [{status}] [{runtime:0.2f}s] {trace}"
                                      .format(num=index + 1,
                                              trace=trace,
                                              runtime=param_data["performance"][index],
                                              status=param_data["statuses"][index].upper()))
        print("\n===========================================================")
        print(". Test Junkie {} (Python{}) {} .".format(
            pkg_resources.require("test-junkie")[0].version, sys.version_info[0], DocumentationLinks.DOMAIN)
        )
        print("===========================================================")

    @staticmethod
    def get_template():

        return {"performance": [],
                "exceptions": [],
                "retries": [],
                "total": 0,
                TestCategory.SUCCESS: 0,
                TestCategory.SKIP: 0,
                TestCategory.FAIL: 0,
                TestCategory.CANCEL: 0,
                TestCategory.IGNORE: 0,
                TestCategory.ERROR: 0}

    def get_average_test_runtime(self):

        from statistics import mean
        samples = []
        for suite in self.__executed_suites:
            for test in suite.get_test_objects():
                test_metrics = test.metrics.get_metrics()
                for class_param, class_param_data in test_metrics.items():
                    for param, param_data in class_param_data.items():
                        samples.append(mean(param_data["performance"]))
        return mean(samples) if samples else None


class ResourceMonitor(threading.Thread):

    def __init__(self):

        self.file_path = "{dir}{sep}.resources_{timestamp}".format(dir=Config.get_root_dir(),
                                                                   sep=os.sep,
                                                                   timestamp=time.time())

        threading.Thread.__init__(self)
        self.exit = multiprocessing.Event()

    def get_file_path(self):

        return self.file_path

    def mkdir_p(self, path):
        try:
            os.makedirs(path)
        except OSError as exc:  # Python >2.5
            if exc.errno == errno.EEXIST and os.path.isdir(path):
                pass
            else:
                raise

    def run(self):
        import psutil
        self.mkdir_p(Config.get_root_dir())
        with open(self.file_path, "w+") as records:
            records.write("")
        while not self.exit.is_set():
            time.sleep(1)
            data = "{timestamp}, {cpu}, {memory}\n".format(timestamp=datetime.now(),
                                                           cpu=psutil.cpu_percent(),
                                                           memory=psutil.virtual_memory().percent)
            with open(self.file_path, "a+") as records:
                records.write(data)

    def shutdown(self):
        self.exit.set()

    def cleanup(self):
        try:
            os.remove(self.file_path)
        except:
            LogJunkie.error(traceback.format_exc())