kiwitcms/Kiwi

View on GitHub
tcms/telemetry/api.py

Summary

Maintainability
A
3 hrs
Test Coverage
from django.db.models import CharField, Count, Value
from django.db.models.functions import Concat
from django.utils.translation import gettext_lazy as _
from modernrpc.auth.basic import http_basic_auth_login_required
from modernrpc.core import rpc_method

from tcms.testcases.models import TestCase
from tcms.testruns.models import TestExecution, TestExecutionStatus


@http_basic_auth_login_required
@rpc_method(name="Testing.breakdown")
def breakdown(query=None):
    """
    .. function:: RPC Testing.breakdown(query)

        Perform a search and return the statistics for the selected test cases

        :param query: Field lookups for :class:`tcms.testcases.models.TestCase`
        :type query: dict
        :return: Object, containing the statistics for the selected test cases
        :rtype: dict
    """

    if query is None:
        query = {}

    test_cases = TestCase.objects.filter(**query).distinct()

    manual_count = test_cases.filter(is_automated=False).count()
    automated_count = test_cases.filter(is_automated=True).count()
    count = {
        "manual": manual_count,
        "automated": automated_count,
        "all": manual_count + automated_count,
    }

    priorities = _get_field_count_map(test_cases, "priority", "priority__value")
    categories = _get_field_count_map(test_cases, "category", "category__name")

    return {
        "count": count,
        "priorities": priorities,
        "categories": categories,
    }


def _get_field_count_map(test_cases, expression, field):
    query_set_confirmed = (
        test_cases.filter(case_status__is_confirmed=True)
        .values(field)
        .annotate(count=Count(expression, distinct=True))
    )
    query_set_not_confirmed = (
        test_cases.exclude(case_status__is_confirmed=True)
        .values(field)
        .annotate(count=Count(expression, distinct=True))
    )
    return {
        str(_("CONFIRMED")): _map_query_set(query_set_confirmed, field),
        str(_("OTHER")): _map_query_set(query_set_not_confirmed, field),
    }


def _map_query_set(query_set, field):
    return {entry[field]: entry["count"] for entry in query_set}


@http_basic_auth_login_required
@rpc_method(name="Testing.status_matrix")
def status_matrix(query=None):
    """
    .. function:: RPC Testing.status_matrix(query)

        Perform a search and return data_set needed to visualize the status matrix
        of test plans, test cases and test executions

        :param query: Field lookups for :class:`tcms.testcases.models.TestPlan`
        :type query: dict
        :return: A dictionary, containing the information about test executions
        :rtype: dict
    """
    if query is None:
        query = {}

    base_query = TestExecution.objects.filter(**query)

    test_cases = list(
        base_query.values("case_id", "case__summary").order_by("case_id").distinct()
    )
    test_executions = {}
    for execution in base_query.values(
        "pk",
        "case_id",
        "run_id",
        "status_id",
        key=Concat("case_id", Value("-"), "run_id", output_field=CharField()),
    ):
        test_executions[execution["key"]] = execution

    # test run summaries for column hints, keyed by TR.pk
    test_runs = dict(
        base_query.values_list("run_id", "run__summary").order_by("run_id").distinct()
    )
    # test plan IDs, keyed by TR.pk
    test_plans = dict(base_query.values_list("run_id", "run__plan").distinct())

    status_colors = dict(TestExecutionStatus.objects.values_list("pk", "color"))

    return {
        "cases": test_cases,
        "executions": test_executions,
        "plans": test_plans,
        "runs": test_runs,
        "statusColors": status_colors,
    }


@http_basic_auth_login_required
@rpc_method(name="Testing.execution_trends")
def execution_trends(query=None):
    if query is None:
        query = {}

    data_set = {}
    categories = []
    colors = []
    counts = {}

    for status in TestExecutionStatus.objects.all():
        data_set[status.name] = []
        colors.append(status.color)
    data_set[str(_("TOTAL"))] = []
    colors.append("black")

    status_count = {
        "positive": 0,
        "negative": 0,
        "neutral": 0,
    }
    run_id = 0
    for test_execution in (
        TestExecution.objects.filter(**query)
        .select_related("status")
        .order_by("run_id")
    ):
        status = test_execution.status

        if status.weight > 0:
            status_count["positive"] += 1
        elif status.weight < 0:
            status_count["negative"] += 1
        else:
            status_count["neutral"] += 1

        if test_execution.run_id == run_id:
            if status.name in counts:
                counts[status.name] += 1
            else:
                counts[status.name] = 1

        else:
            _append_status_counts_to_result(counts, data_set)

            counts = {status.name: 1}
            run_id = test_execution.run_id
            categories.append(run_id)

    # append the last result
    _append_status_counts_to_result(counts, data_set)

    for _key, value in data_set.items():
        del value[0]

    return {
        "categories": categories,
        "data_set": data_set,
        "colors": colors,
        "status_count": status_count,
    }


def _append_status_counts_to_result(counts, result):
    total = 0
    for status_name in filter(lambda x: x != _("TOTAL"), result.keys()):
        status_count = counts.get(status_name, 0)
        result.get(status_name).append(status_count)

        total += status_count

    result.get(str(_("TOTAL"))).append(total)


@http_basic_auth_login_required
@rpc_method(name="Testing.test_case_health")
def test_case_health(query=None):
    if query is None:
        query = {}

    all_test_executions = TestExecution.objects.filter(**query)

    test_executions = _get_count_for(all_test_executions)
    failed_test_executions = _get_count_for(
        all_test_executions.filter(status__weight__lt=0)
    )

    data = {}
    for value in test_executions:
        data[value["case_id"]] = {
            "case_id": value["case_id"],
            "case_summary": value["case__summary"],
            "count": {"all": value["count"], "fail": 0},
        }

    _count_test_executions(data, failed_test_executions, "fail")

    # remove all with 100% success rate, because they are not interesting
    _remove_all_excellent_executions(data)

    data = list(data.values())
    data.sort(key=_sort_by_failing_rate, reverse=True)

    if len(data) > 30:
        data = data[:30]

    return data


@http_basic_auth_login_required
@rpc_method(name="Testing.individual_test_case_health")
def individual_test_case_health_simple(query=None):
    if query is None:
        query = {}

    res = (
        TestExecution.objects.filter(**query)
        .values("run__plan", "case_id", "status__name", "status__weight")
        .order_by("case", "run__plan", "status__weight")
    )

    return list(res)


def _remove_all_excellent_executions(data):
    for key in dict.fromkeys(data):
        if data[key]["count"]["fail"] == 0:
            data.pop(key)


def _count_test_executions(data, test_executions, status):
    for value in test_executions:
        data[value["case_id"]]["count"][status] = value["count"]


def _sort_by_failing_rate(element):
    return element["count"]["fail"] / element["count"]["all"]


def _get_count_for(test_executions):
    return (
        test_executions.values("case_id", "case__summary")
        .annotate(count=Count("case_id"))
        .order_by("case_id")
    )