kiwitcms/Kiwi

View on GitHub
tcms/rpc/api/testplan.py

Summary

Maintainability
B
6 hrs
Test Coverage
# -*- coding: utf-8 -*-

from django.db.models import Count
from django.forms.models import model_to_dict
from modernrpc.core import REQUEST_KEY, rpc_method

from tcms.management.models import Tag
from tcms.rpc import utils
from tcms.rpc.api.forms.testplan import EditPlanForm, NewPlanForm
from tcms.rpc.decorators import permissions_required
from tcms.testcases.models import TestCase, TestCasePlan
from tcms.testplans.models import TestPlan


@permissions_required("testplans.add_testplan")
@rpc_method(name="TestPlan.create")
def create(values, **kwargs):
    """
    .. function:: RPC TestPlan.create(values)

        Create new Test Plan object and store it in the database.

        :param values: Field values for :class:`tcms.testplans.models.TestPlan`
        :type values: dict
        :param \\**kwargs: Dict providing access to the current request, protocol,
                entry point name and handler instance from the rpc method
        :return: Serialized :class:`tcms.testplans.models.TestPlan` object
        :rtype: dict
        :raises PermissionDenied: if missing *testplans.add_testplan* permission
        :raises ValueError: if data validation fails

        Minimal parameters::

            >>> values = {
                'product': 61,
                'product_version': 93,
                'name': 'Testplan foobar',
                'type': 1,
                'parent': 150,
                'text':'Testing TCMS',
            }
            >>> TestPlan.create(values)
    """
    request = kwargs.get(REQUEST_KEY)

    if not values.get("author"):
        values["author"] = request.user.pk

    if not values.get("is_active"):
        values["is_active"] = True

    form = NewPlanForm(values)
    form.populate(product_id=values["product"])

    if form.is_valid():
        test_plan = form.save()
        result = model_to_dict(test_plan, exclude=["tag"])

        # b/c value is set in the DB directly and if None
        # model_to_dict() will not return it
        result["create_date"] = test_plan.create_date
        return result

    raise ValueError(list(form.errors.items()))


@permissions_required("testplans.view_testplan")
@rpc_method(name="TestPlan.filter")
def filter(query=None):  # pylint: disable=redefined-builtin
    """
    .. function:: RPC TestPlan.filter(query)

        Perform a search and return the resulting list of test plans.

        :param query: Field lookups for :class:`tcms.testplans.models.TestPlan`
        :type query: dict
        :return: List of serialized :class:`tcms.testplans.models.TestPlan` objects
        :rtype: list(dict)
    """

    if query is None:
        query = {}

    return list(
        TestPlan.objects.filter(**query)
        .values(
            "id",
            "name",
            "text",
            "create_date",
            "is_active",
            "extra_link",
            "product_version",
            "product_version__value",
            "product",
            "product__name",
            "author",
            "author__username",
            "type",
            "type__name",
            "parent",
        )
        .annotate(Count("children"))
        .order_by("product", "id")
        .distinct()
    )


@permissions_required("testplans.add_testplantag")
@rpc_method(name="TestPlan.add_tag")
def add_tag(plan_id, tag_name, **kwargs):
    """
    .. function:: RPC TestPlan.add_tag(plan_id, tag_name)

        Add a tag to the specified test plan.

        :param plan_id: PK of TestPlan to modify
        :type plan_id: int
        :param tag_name: Tag name to add
        :type tag_name: str
        :param \\**kwargs: Dict providing access to the current request, protocol,
                entry point name and handler instance from the rpc method
        :raises PermissionDenied: if missing *testplans.add_testplantag* permission
        :raises TestPlan.DoesNotExist: if object specified by PK doesn't exist
        :raises Tag.DoesNotExist: if missing *management.add_tag* permission and *tag_name*
                 doesn't exist in the database!
    """
    request = kwargs.get(REQUEST_KEY)
    tag, _ = Tag.get_or_create(request.user, tag_name)
    TestPlan.objects.get(pk=plan_id).add_tag(tag)


@permissions_required("testplans.delete_testplantag")
@rpc_method(name="TestPlan.remove_tag")
def remove_tag(plan_id, tag_name):
    """
    .. function:: RPC TestPlan.remove_tag(plan_id, tag_name)

        Remove tag from the specified test plan.

        :param plan_id: PK of TestPlan to modify
        :type plan_id: int
        :param tag_name: Tag name to remove
        :type tag_name: str
        :raises PermissionDenied: if missing *testplans.delete_testplantag* permission
        :raises DoesNotExist: if objects specified don't exist
    """
    tag = Tag.objects.get(name=tag_name)
    TestPlan.objects.get(pk=plan_id).remove_tag(tag)


@permissions_required("testplans.change_testplan")
@rpc_method(name="TestPlan.update")
def update(plan_id, values):
    """
    .. function:: RPC TestPlan.update(plan_id, values)

        Update the fields of the selected test plan.

        :param plan_id: PK of TestPlan to modify
        :type plan_id: int
        :param values: Field values for :class:`tcms.testplans.models.TestPlan`
        :type values: dict
        :return: Serialized :class:`tcms.testplans.models.TestPlan` object
        :rtype: dict
        :raises TestPlan.DoesNotExist: if object specified by PK doesn't exist
        :raises PermissionDenied: if missing *testplans.change_testplan* permission
        :raises ValueError: if validations fail
    """
    test_plan = TestPlan.objects.get(pk=plan_id)
    form = EditPlanForm(values, instance=test_plan)
    if form.is_valid():
        test_plan = form.save()
        result = model_to_dict(test_plan, exclude=["tag"])

        # b/c value is set in the DB directly and if None
        # model_to_dict() will not return it
        result["create_date"] = test_plan.create_date
        return result

    raise ValueError(list(form.errors.items()))


@permissions_required("testcases.add_testcaseplan")
@rpc_method(name="TestPlan.add_case")
def add_case(plan_id, case_id):
    """
    .. function:: RPC TestPlan.add_case(plan_id, case_id)

        Link test case to the given plan.

        :param plan_id: PK of TestPlan to modify
        :type plan_id: int
        :param case_id: PK of TestCase to be added to plan
        :type case_id: int
        :return: Serialized :class:`tcms.testcases.models.TestCase` object
                 augmented with a 'sortkey' value
        :rtype: dict
        :raises TestPlan.DoesNotExit or TestCase.DoesNotExist: if objects specified
                 by PKs are missing
        :raises PermissionDenied: if missing *testcases.add_testcaseplan* permission
    """
    plan = TestPlan.objects.get(pk=plan_id)
    case = TestCase.objects.get(pk=case_id)
    test_case_plan = plan.add_case(case)

    result = model_to_dict(case, exclude=["component", "plan", "tag"])
    result["create_date"] = case.create_date
    result["sortkey"] = test_case_plan.sortkey
    return result


@permissions_required("testcases.delete_testcaseplan")
@rpc_method(name="TestPlan.remove_case")
def remove_case(plan_id, case_id):
    """
    .. function:: RPC TestPlan.remove_case(plan_id, case_id)

        Unlink a test case from the given plan.

        :param plan_id: PK of TestPlan to modify
        :type plan_id: int
        :param case_id: PK of TestCase to be removed from plan
        :type case_id: int
        :raises PermissionDenied: if missing *testcases.delete_testcaseplan* permission
    """
    TestCasePlan.objects.filter(case=case_id, plan=plan_id).delete()


@permissions_required("testcases.change_testcaseplan")
@rpc_method(name="TestPlan.update_case_order")
def update_case_order(plan_id, case_id, sortkey):
    """
    .. function:: RPC TestPlan.update_case_order(plan_id, case_id, sortkey)

        Update sortkey which controls display order of the given test case in
        the given test plan.

        :param plan_id: PK of TestPlan holding the selected TestCase
        :type plan_id: int
        :param case_id: PK of TestCase to be modified
        :type case_id: int
        :param sortkey: Ordering value, e.g. 10, 20, 30
        :type sortkey: int
        :raises PermissionDenied: if missing *testcases.delete_testcaseplan* permission
    """
    TestCasePlan.objects.filter(  # pylint:disable=objects-update-used
        case=case_id, plan=plan_id
    ).update(sortkey=sortkey)


@permissions_required("attachments.view_attachment")
@rpc_method(name="TestPlan.list_attachments")
def list_attachments(plan_id, **kwargs):
    """
    .. function:: RPC TestPlan.list_attachments(plan_id)

        List attachments for the given TestPlan.

        :param plan_id: PK of TestPlan to inspect
        :type plan_id: int
        :param \\**kwargs: Dict providing access to the current request, protocol,
                entry point name and handler instance from the rpc method
        :return: A list containing information and download URLs for attachements
        :rtype: list
        :raises TestPlan.DoesNotExit: if object specified by PK is missing
    """
    plan = TestPlan.objects.get(pk=plan_id)
    request = kwargs.get(REQUEST_KEY)
    return utils.get_attachments_for(request, plan)


@permissions_required("attachments.add_attachment")
@rpc_method(name="TestPlan.add_attachment")
def add_attachment(plan_id, filename, b64content, **kwargs):
    """
    .. function:: RPC TestPlan.add_attachment(plan_id, filename, b64content)

        Add attachment to the given TestPlan.

        :param plan_id: PK of TestPlan
        :type plan_id: int
        :param filename: File name of attachment, e.g. 'logs.txt'
        :type filename: str
        :param b64content: Base64 encoded content
        :type b64content: str
        :param \\**kwargs: Dict providing access to the current request, protocol,
                entry point name and handler instance from the rpc method
    """
    utils.add_attachment(
        plan_id,
        "testplans.TestPlan",
        kwargs.get(REQUEST_KEY).user,
        filename,
        b64content,
    )


@permissions_required("testplans.view_testplan")
@rpc_method(name="TestPlan.tree")
def tree(plan_id):
    """
    .. function:: RPC TestPlan.tree(plan_id)

        Returns a list of the ancestry tree for the given TestPlan
        in a depth-first order!

        :param plan_id: PK of TestPlan to inspect
        :type plan_id: int
        :return: A DFS ordered list of all test plans in the family tree
                 starting from the root of the tree
        :rtype: list
        :raises TestPlan.DoesNotExit: if object specified by PK is missing
    """
    plan = TestPlan.objects.get(pk=plan_id)
    result = []

    for record in plan.tree_as_list():
        result.append(
            {
                "id": record.pk,
                "name": record.name,
                "parent_id": record.parent_id,
                "tree_depth": record.tree_depth,
                "url": record.get_full_url(),
            }
        )

    return result