tcms/rpc/api/testrun.py
# -*- coding: utf-8 -*-
from django.forms.models import model_to_dict
from modernrpc.core import REQUEST_KEY, rpc_method
from tcms.management.models import Tag
from tcms.rpc import utils
from tcms.rpc.api.forms.testrun import UpdateForm, UserForm
from tcms.rpc.decorators import permissions_required
from tcms.testcases.models import TestCase
from tcms.testruns.forms import NewRunForm
from tcms.testruns.models import Property, TestExecution, TestRun
@permissions_required("testruns.add_testexecution")
@rpc_method(name="TestRun.add_case")
def add_case(run_id, case_id):
"""
.. function:: RPC TestRun.add_case(run_id, case_id)
Add a TestCase to the selected test run.
:param run_id: PK of TestRun to modify
:type run_id: int
:param case_id: PK of TestCase to be added
:type case_id: int
:return: A list of serialized :class:`tcms.testruns.models.TestExecution` objects
:rtype: list(dict)
:raises DoesNotExist: if objects specified by the PKs don't exist
:raises PermissionDenied: if missing *testruns.add_testexecution* permission
:raises RuntimeError: if test case status is not CONFIRMED
"""
run = TestRun.objects.get(pk=run_id)
case = TestCase.objects.get(pk=case_id)
if run.executions.filter(case=case).exists():
return annotate_executions_with_properties(run.executions.filter(case=case))
if not case.case_status.is_confirmed:
raise RuntimeError(f"TC-{case.pk} status is not confirmed")
# always add new TEs at the end of TR
sortkey = 10
last_te = run.executions.order_by("sortkey").last()
if last_te: # in case there are no other TEs
sortkey += last_te.sortkey
return annotate_executions_with_properties(
run.create_execution(case=case, sortkey=sortkey)
)
def annotate_executions_with_properties(executions_iterable):
result = []
for execution in executions_iterable:
serialized_execution = model_to_dict(execution)
serialized_execution["properties"] = list(
execution.properties().values("name", "value")
)
result.append(serialized_execution)
return result
@permissions_required("testruns.delete_testexecution")
@rpc_method(name="TestRun.remove_case")
def remove_case(run_id, case_id):
"""
.. function:: RPC TestRun.remove_case(run_id, case_id)
WARNING: this method is deprecated in favor of ``TestExecution.remove()``!
Nothing in Kiwi TCMS uses it directly and it will be removed in the future!
"""
TestExecution.objects.filter(run=run_id, case=case_id).delete()
@permissions_required("testruns.view_testrun")
@rpc_method(name="TestRun.get_cases")
def get_cases(run_id):
"""
.. function:: RPC TestRun.get_cases(run_id)
Get the list of test cases that are attached to a test run.
:param run_id: PK of TestRun to inspect
:type run_id: int
:return: Serialized list of :class:`tcms.testcases.models.TestCase` objects
augmented with ``execution_id`` and ``status`` information.
:rtype: list(dict)
"""
result = list(
TestCase.objects.filter(executions__run_id=run_id).values(
"id",
"create_date",
"is_automated",
"script",
"arguments",
"extra_link",
"summary",
"requirement",
"notes",
"text",
"case_status",
"category",
"priority",
"author",
"default_tester",
"reviewer",
)
)
executions = TestExecution.objects.filter(run_id=run_id).values(
"case", "pk", "status__name"
)
extra_info = {row["case"]: row for row in executions.iterator()}
for case in result:
info = extra_info[case["id"]]
case["execution_id"] = info["pk"]
case["status"] = info["status__name"]
return result
@permissions_required("testruns.add_testruntag")
@rpc_method(name="TestRun.add_tag")
def add_tag(run_id, tag_name, **kwargs):
"""
.. function:: RPC TestRun.add_tag(run_id, tag)
Add one tag to the specified test run.
:param run_id: PK of TestRun to modify
:type run_id: int
:param tag_name: Tag name to add
:type tag_name: str
:param \\**kwargs: Dict providing access to the current request, protocol,
entry point name and handler instance from the rpc method
:return: Serialized list of :class:`tcms.management.models.Tag` objects
:rtype: dict
:raises PermissionDenied: if missing *testruns.add_testruntag* permission
:raises TestRun.DoesNotExist: if object specified by PK doesn't exist
:raises Tag.DoesNotExist: if missing *management.add_tag* permission and *tag_name*
doesn't exist in the database!
"""
request = kwargs.get(REQUEST_KEY)
tag, _ = Tag.get_or_create(request.user, tag_name)
test_run = TestRun.objects.get(pk=run_id)
test_run.add_tag(tag)
return list(test_run.tag.values("id", "name"))
@permissions_required("testruns.delete_testruntag")
@rpc_method(name="TestRun.remove_tag")
def remove_tag(run_id, tag_name):
"""
.. function:: RPC TestRun.remove_tag(run_id, tag)
Remove a tag from the specified test run.
:param run_id: PK of TestRun to modify
:type run_id: int
:param tag_name: Tag name to add
:type tag_name: str
:return: Serialized list of :class:`tcms.management.models.Tag` objects
:rtype: dict
:raises PermissionDenied: if missing *testruns.delete_testruntag* permission
:raises DoesNotExist: if objects specified don't exist
"""
tag = Tag.objects.get(name=tag_name)
test_run = TestRun.objects.get(pk=run_id)
test_run.remove_tag(tag)
return list(test_run.tag.values("id", "name"))
@permissions_required("testruns.add_testrun")
@rpc_method(name="TestRun.create")
def create(values, **kwargs):
"""
.. function:: RPC TestRun.create(values)
Create new TestRun object and store it in the database.
:param values: Field values for :class:`tcms.testruns.models.TestRun`
:type values: dict
:param \\**kwargs: Dict providing access to the current request, protocol,
entry point name and handler instance from the rpc method
:return: Serialized :class:`tcms.testruns.models.TestRun` object
:rtype: dict
:raises PermissionDenied: if missing *testruns.add_testrun* permission
:raises ValueError: if data validations fail
Example::
>>> values = {'build': 384,
'manager': 137,
'plan': 137,
'summary': 'Testing XML-RPC for TCMS',
}
>>> TestRun.create(values)
"""
if not values.get("default_tester"):
values["default_tester"] = kwargs.get(REQUEST_KEY).user.pk
form = NewRunForm(values)
form.populate(values.get("plan"))
if form.is_valid():
test_run = form.save()
result = model_to_dict(test_run, exclude=["cc", "tag"])
# b/c value is set in the DB directly and if None
# model_to_dict() will not return it
result["start_date"] = test_run.start_date
return result
raise ValueError(list(form.errors.items()))
@permissions_required("testruns.view_testrun")
@rpc_method(name="TestRun.filter")
def filter(query=None): # pylint: disable=redefined-builtin
"""
.. function:: RPC TestRun.filter(query)
Perform a search and return the resulting list of test runs.
:param query: Field lookups for :class:`tcms.testruns.models.TestRun`
:type query: dict
:return: List of serialized :class:`tcms.testruns.models.TestRun` objects
:rtype: list(dict)
"""
if query is None:
query = {}
return list(
TestRun.objects.filter(**query)
.values(
"id",
"start_date",
"stop_date",
"planned_start",
"planned_stop",
"summary",
"notes",
"plan",
"plan__name",
"build",
"build__name",
"build__version",
"build__version__value",
"build__version__product",
"manager",
"manager__username",
"default_tester",
"default_tester__username",
)
.distinct()
)
@permissions_required("testruns.change_testrun")
@rpc_method(name="TestRun.update")
def update(run_id, values):
"""
.. function:: RPC TestRun.update(run_id, values)
Update the selected TestRun
:param run_id: PK of TestRun to modify
:type run_id: int
:param values: Field values for :class:`tcms.testruns.models.TestRun`
:type values: dict
:return: Serialized :class:`tcms.testruns.models.TestRun` object
:rtype: dict
:raises PermissionDenied: if missing *testruns.change_testrun* permission
:raises ValueError: if data validations fail
"""
test_run = TestRun.objects.get(pk=run_id)
form = UpdateForm(values, instance=test_run)
# In the rare case where this TR is reassigned to another TP
# don't validate if TR.build has a FK relationship with TP.product_version.
# Instead all Build IDs should be valid
if "plan" not in values:
form.populate(version_id=test_run.plan.product_version_id)
if form.is_valid():
test_run = form.save()
result = model_to_dict(test_run, exclude=["cc", "tag"])
# b/c value is set in the DB directly and if None
# model_to_dict() will not return it
result["start_date"] = test_run.start_date
result["stop_date"] = test_run.stop_date
return result
raise ValueError(list(form.errors.items()))
@permissions_required("testruns.change_testrun")
@rpc_method(name="TestRun.add_cc")
def add_cc(run_id, username):
"""
.. function:: RPC TestRun.add_cc(run_id, username)
Add the chosen user to TestRun CC
:param run_id: PK of TestRun to modify
:type run_id: int
:param username: PK, email or username
:type username: string
:raises DoesNotExist: if test run specified by the PK doesn't exist
:raises PermissionDenied: if missing *testruns.change_testrun* permission
:raises ValueError: if data validations fail
"""
test_run = TestRun.objects.get(pk=run_id)
form = UserForm({"user": username})
if not form.is_valid():
raise ValueError(list(form.errors.items()))
test_run.add_cc(form.cleaned_data["user"])
@permissions_required("testruns.change_testrun")
@rpc_method(name="TestRun.remove_cc")
def remove_cc(run_id, username):
"""
.. function:: RPC TestRun.remove_cc(run_id, username)
Remove the chosen user from TestRun CC
:param run_id: PK of TestRun to modify
:type run_id: int
:param username: PK, email or username
:type username: string
:raises DoesNotExist: if test run specified by the PK doesn't exist
:raises PermissionDenied: if missing *testruns.change_testrun* permission
:raises ValueError: if data validations fail
"""
test_run = TestRun.objects.get(pk=run_id)
form = UserForm({"user": username})
if not form.is_valid():
raise ValueError(list(form.errors.items()))
test_run.remove_cc(form.cleaned_data["user"])
@permissions_required("testruns.view_property")
@rpc_method(name="TestRun.properties")
def properties(query=None):
"""
.. function:: TestRun.properties(query)
Return all properties for the specified test run(s).
:param query: Field lookups for :class:`tcms.testruns.models.Property`
:type query: dict
:return: Serialized list of :class:`tcms.testruns.models.Property` objects.
:rtype: list(dict)
:raises PermissionDenied: if missing *testruns.view_property* permission
"""
if query is None:
query = {}
return list(
Property.objects.filter(**query)
.values(
"id",
"run",
"name",
"value",
)
.order_by("run", "name", "value")
.distinct()
)
@permissions_required("attachments.add_attachment")
@rpc_method(name="TestRun.add_attachment")
def add_attachment(run_id, filename, b64content, **kwargs):
"""
.. function:: RPC TestRun.add_attachment(run_id, filename, b64content)
Add attachment to the given TestRun.
:param run_id: PK of TestRun
:type run_id: int
:param filename: File name of attachment, e.g. 'logs.txt'
:type filename: str
:param b64content: Base64 encoded content
:type b64content: str
:param \\**kwargs: Dict providing access to the current request, protocol,
entry point name and handler instance from the rpc method
"""
utils.add_attachment(
run_id,
"testruns.TestRun",
kwargs.get(REQUEST_KEY).user,
filename,
b64content,
)
@permissions_required("testruns.delete_testrun")
@rpc_method(name="TestRun.remove")
def remove(query):
"""
.. function:: RPC TestRun.remove(query)
Remove TestRun object(s).
:param query: Field lookups for :class:`tcms.testruns.models.TestRuns`
:type query: dict
:raises PermissionDenied: if missing the *testruns.delete_testrun* permission
:return: The number of objects deleted and a dictionary with the
number of deletions per object type.
:rtype: int, dict
.. versionadded:: 13.5
"""
return TestRun.objects.filter(**query).delete()