testcube/core/api/views.py
import re
from datetime import datetime, timezone, timedelta
from django.db.models import Q
from ipware.ip import get_ip
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.permissions import IsAdminUser
from rest_framework.response import Response
from tagging.models import Tag
from testcube.settings import logger
from .filters import *
from .serializers import *
from ...utils import get_auto_cleanup_run_days, cleanup_run_media, object_to_dict, error_detail, to_json
def info_view(self, serializer_class):
self.serializer_class = serializer_class
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)
def list_view(self):
self.pagination_class = LimitOffsetPagination
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class TeamViewSet(viewsets.ModelViewSet):
queryset = Team.objects.all()
serializer_class = TeamSerializer
filter_fields = ('name', 'owner')
search_fields = filter_fields
@action(detail=False)
def recent(self, request):
"""get recent teams"""
self.queryset = Team.objects.order_by('name').all()
self.serializer_class = TeamListSerializer
return list_view(self)
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer
filter_fields = ('name', 'owner', 'version', 'team')
search_fields = ('name', 'owner', 'version')
@action(detail=False)
def recent(self, request):
"""get recent teams"""
self.queryset = Product.objects.order_by('name').all()
self.serializer_class = ProductListSerializer
return list_view(self)
@action(methods=['get'], detail=True)
def tags(self, request, pk=None):
prod = self.get_object()
case_query = TestCase.objects.filter(product_id=prod.id).all()
tags = Tag.objects.usage_for_queryset(case_query)
tags = [t.name for t in tags]
return Response(data=tags)
class ConfigurationViewSet(viewsets.ModelViewSet):
permission_classes = [IsAdminUser]
queryset = Configuration.objects.all()
serializer_class = ConfigurationSerializer
filter_fields = ('key', 'value')
search_fields = filter_fields
class TestClientViewSet(viewsets.ModelViewSet):
queryset = TestClient.objects.all()
serializer_class = TestClientSerializer
filter_fields = ('name', 'ip', 'platform', 'owner')
search_fields = filter_fields
class TestRunViewSet(viewsets.ModelViewSet):
queryset = TestRun.objects.all()
serializer_class = TestRunSerializer
filter_fields = ('name', 'state', 'status', 'owner', 'product')
search_fields = ('name', 'owner')
@action(methods=['get'], detail=True)
def info(self, request, pk=None):
return info_view(self, TestRunDetailSerializer)
@action(methods=['get'], detail=True)
def tags(self, request, pk=None):
run = self.get_object()
case_query = TestCase.objects.filter(results__test_run_id=run.id).all()
tags = Tag.objects.usage_for_queryset(case_query, counts=True)
tags = [(t.name, t.count) for t in tags]
tags = sorted(tags, key=lambda tag: -tag[1])
no_tag_tc = len([tc for tc in case_query if not tc.tags])
if no_tag_tc:
tags.append(('no_tags', no_tag_tc))
return Response(data=tags)
@action(detail=False)
def recent(self, request):
"""get recent runs, in run list view"""
self.serializer_class = TestRunListSerializer
self.filter_class = TestRunFilter
return list_view(self)
@action(detail=False)
def clear(self, request):
"""clear dead runs, will be called async when user visit run list."""
pending_runs = TestRun.objects.filter(state__lt=2) # not ready, starting, running
fixed = []
for run in pending_runs:
delta = datetime.now(timezone.utc) - run.start_time
if delta.days > 1 and run.state < 2:
logger.info('abort run: {}'.format(run.id))
run.state, run.status = 2, 1 # abort, failed
run.save()
fixed.append(run.id)
bad_runs = TestRun.objects.filter(results=None) # run without results > 2 days
for run in bad_runs:
if (datetime.now(tz=timezone.utc) - run.start_time).days >= 2:
logger.info('delete run: {}'.format(run.id))
fixed.append(run.id)
run.delete()
return Response(data=fixed)
@action(detail=False)
def cleanup(self, request):
"""cleanup runs = delete old runs via config value."""
if 'days' in request.GET:
days = int(request.GET.get('days'))
else:
days = get_auto_cleanup_run_days()
if days <= 0:
return Response(data=[])
logger.info('clean up runs before {} days'.format(days))
time_threshold = datetime.now(tz=timezone.utc) - timedelta(days=days)
pending_runs = TestRun.objects.filter(start_time__lt=time_threshold)
fixed = []
for run in pending_runs:
logger.info('delete old run: {}'.format(run.id))
cleanup_run_media(run.id)
fixed.append(run.id)
run.delete()
return Response(data=fixed)
@action(methods=['get'], detail=True)
def history(self, request, pk=None):
"""get run history, will be used in run detail page."""
instance = self.get_object()
self.filter_fields = ()
self.queryset = TestRun.objects.filter(name=instance.name, product=instance.product)
self.serializer_class = TestRunListSerializer
return list_view(self)
@action(detail=False, methods=['post', 'get'])
def start(self, request):
"""client api to start a run, `example` as reference."""
example = {
'name': 'your run name',
'owner': 'run owner, default => current user',
'start_by': 'run starter, default => current user',
'source': {
'link': '=>optional, e.g. http://jenkins...',
'name': '=>optional, e.g. Jenkins'},
'product': {
'name': 'TestCube',
'version': '=>optional',
'owner': 'default => current user',
'team': {
'name': 'ATeam',
'owner': 'default => current user'
}
},
'variables': 'json.dumps(dict(environ))'
}
if request.method == 'GET':
return Response(data={
'message': 'Please post to this API, see below example.',
'example': example
})
try:
team_data = request.data['product'].pop('team')
product_data = request.data.pop('product')
source_data, variables_data = None, None
if 'source' in request.data:
source_data = request.data['source']
del request.data['source']
if 'variables' in request.data:
variables_data = request.data['variables']
del request.data['variables']
run_data = request.data
if 'owner' not in run_data:
run_data['owner'] = request.user.username
if 'start_by' not in run_data:
run_data['start_by'] = request.user.username
team_obj, _ = Team.objects.update_or_create(name=team_data['name'], defaults=team_data)
if not team_obj.owner:
team_obj.owner = request.user.username
team_obj.save()
product_data['team'] = team_obj
product_version = product_data.get('version', 'latest')
product_obj, _ = Product.objects.update_or_create(name=product_data['name'],
team=team_obj,
version=product_version,
defaults=product_data)
if not product_obj.owner:
product_obj.owner = request.user.username
product_obj.save()
# only add source when it's link available
if source_data and source_data['link']:
source_obj = ObjectSource.objects.create(**source_data)
run_data['source'] = source_obj
run_data['product'] = product_obj
run_data['state'] = 0 # starting
run_obj = TestRun.objects.create(**run_data)
if variables_data and to_json(variables_data):
from testcube.runner.models import RunVariables
RunVariables.objects.create(test_run=run_obj, data=variables_data)
return Response(data={
'success': True,
'run': object_to_dict(run_obj)
})
except Exception as e:
return Response(data={
'success': False,
'message': error_detail(e),
'example': example
})
@action(detail=False, methods=['post', 'get'])
def stop(self, request):
"""client api to stop a run, `example` as reference."""
example = {
'run_id': 123,
'state': 'int, default 3=>completed, (2=aborted)',
'status': 'int, default 1=>failed, (0=passed, 3=abandoned)',
'end_time': 'utc time, default=>now',
'source': {
'name': '=>optional, e.g. Jenkins',
'link': '=>optional, e.g. http://jenkins...'
}
}
if request.method == 'GET':
return Response(data={
'message': 'Please post to this API, see the example',
'example': example
})
try:
run_id = request.data.pop('run_id')
source_data = request.data.pop('source', None)
run_data = request.data
if source_data:
source_obj, _ = ObjectSource.objects.create(**source_data)
run_data['source'] = source_obj
if 'state' not in run_data:
run_data['state'] = 3 # completed
if 'status' not in run_data:
run_data['status'] = 1 # failed
if 'end_time' not in run_data:
run_data['end_time'] = datetime.now(timezone.utc)
run_obj = TestRun.objects.filter(pk=run_id)
run_obj.update(**run_data)
return Response(data={
'success': True,
'run': object_to_dict(run_obj.first())
})
except Exception as e:
return Response(data={
'success': False,
'message': error_detail(e),
'example': example
})
class TestCaseViewSet(viewsets.ModelViewSet):
queryset = TestCase.objects.all()
serializer_class = TestCaseSerializer
filter_fields = ('name', 'full_name', 'keyword', 'priority', 'owner', 'product')
search_fields = ('name', 'full_name', 'keyword')
@action(methods=['get'], detail=True)
def info(self, request, pk=None):
"""query result info, use for result detail page."""
return info_view(self, TestCaseDetailSerializer)
@action(methods=['get', 'post'], detail=True)
def tags(self, request, pk=None):
"""query result tags"""
from tagging.models import Tag
instance = self.get_object()
if request.method == 'POST':
method = request.POST.get('method', 'add')
tag_name = request.POST.get('tags', '').lower()
if not bool(re.fullmatch('[ _\w-]+', tag_name)):
return Response({'error': 'bad tag name!'}, status=400)
if method == 'add':
Tag.objects.add_tag(instance, tag_name)
elif method == 'remove':
tags = [t.name for t in instance.tags if t.name != tag_name]
instance.tags = ','.join(tags)
tags = [t.name for t in instance.tags]
return Response(data=tags)
@action(detail=False)
def recent(self, request):
"""get recent testcase, use for test case page."""
self.serializer_class = TestCaseListSerializer
self.filter_class = TestCaseFilter
return list_view(self)
@action(methods=['get'], detail=True)
def history(self, request, pk=None):
"""get test case history, use in test case view or result detail view."""
instance = self.get_object()
self.filter_fields = ()
self.queryset = TestResult.objects.filter(testcase__id=instance.id)
self.serializer_class = TestResultHistorySerializer
return list_view(self)
class TestResultViewSet(viewsets.ModelViewSet):
queryset = TestResult.objects.all()
serializer_class = TestResultSerializer
filter_fields = ('outcome', 'assigned_to')
search_fields = filter_fields
@action(methods=['get'], detail=True)
def info(self, request, pk=None):
"""query result info, use for result detail page."""
return info_view(self, TestResultDetailSerializer)
@action(methods=['get'], detail=True)
def files(self, request, pk=None):
"""query result files, use for result detail page."""
return info_view(self, TestResultFilesSerializer)
@action(methods=['get'], detail=True)
def resets(self, request, pk=None):
"""query result reset history, use for result detail page."""
return info_view(self, TestResultResetHistorySerializer)
@action(detail=False)
def recent(self, request):
"""get recent runs, in run list view"""
self.serializer_class = TestResultListSerializer
keyword = request.GET.get('search', None)
if keyword:
self.queryset = TestResult.objects.filter(Q(testcase__name__icontains=keyword) |
Q(error__message__icontains=keyword))
self.search_fields = ()
return list_view(self)
@action(detail=False, methods=['get', 'post'])
def new(self, request):
"""client api to create a result, `example` as reference"""
example = {
'run_id': 123,
'outcome': 'int, 0=passed, 1=failed, 2=skipped, 3=error, 5=pending',
'stdout': 'the log output',
'duration': 'float, in seconds',
'assign_to': ' owner / optional',
'testcase': {
'name': 'short name, e.g. VerifyLoginFailed',
'full_name': 'long name, e.g. tests.login_tests.VerifyLoginFailed',
'description': 'optional',
'owner': 'default => current user'
},
'test_client': {
'name': 'client name',
'ip': 'optional, default to current ip',
'platform': 'optional, client platform',
'owner': 'default => current user',
},
'error': {
'exception_type': 'optional, e.g, AssertError',
'message': 'optional, the message of exception',
'stacktrace': 'optional, the stack trace info',
'stdout': 'optional',
'stderr': 'optional'
}
}
if request.method == 'GET':
return Response(data={
'message': 'Please post to this API, see the example.',
'example': example
})
try:
run_id = request.data.pop('run_id')
case_data = request.data.pop('testcase')
client_data = request.data.pop('test_client')
error_data = request.data.pop('error', None)
result_data = request.data
if 'ip' not in client_data:
client_data['ip'] = get_ip(request)
run_obj = TestRun.objects.get(id=run_id)
case_data['product'] = run_obj.product
case_obj, _ = TestCase.objects.update_or_create(full_name=case_data['full_name'], defaults=case_data)
client_obj, _ = TestClient.objects.update_or_create(name=client_data['name'], defaults=client_data)
if not case_obj.owner:
case_obj.owner = request.user.username
case_obj.save()
if not case_obj.created_by:
case_obj.created_by = request.user.username
case_obj.save()
if not client_obj.owner:
client_obj.owner = request.user.username
client_obj.save()
if error_data and error_data['exception_type']:
error_obj = ResultError.objects.create(**error_data)
result_data['error'] = error_obj
result_data['test_run'] = run_obj
result_data['testcase'] = case_obj
result_data['test_client'] = client_obj
result_data['duration'] = timedelta(seconds=result_data['duration'])
result_obj = TestResult.objects.create(**result_data)
return Response(data={
'success': True,
'result': object_to_dict(result_obj)
})
except Exception as e:
return Response(data={
'success': False,
'message': error_detail(e),
'example': example
})
class IssueViewSet(viewsets.ModelViewSet):
queryset = Issue.objects.all()
serializer_class = IssueSerializer
filter_fields = ('name', 'summary', 'created_by', 'assigned_to', 'status')
search_fields = filter_fields
class ResultAnalysisViewSet(viewsets.ModelViewSet):
queryset = ResultAnalysis.objects.all()
serializer_class = ResultAnalysisSerializer
filter_fields = ('by', 'reason', 'description')
search_fields = filter_fields
class ResultErrorViewSet(viewsets.ModelViewSet):
queryset = ResultError.objects.all()
serializer_class = ResultErrorSerializer
filter_fields = ('exception_type', 'message', 'stacktrace', 'stdout')
search_fields = filter_fields
class ObjectSourceViewSet(viewsets.ModelViewSet):
queryset = ObjectSource.objects.all()
serializer_class = ObjectSourceSerializer
filter_fields = ()
search_fields = filter_fields
class ResultFileViewSet(viewsets.ModelViewSet):
queryset = ResultFile.objects.all()
serializer_class = ResultFileSerializer
filter_fields = ()
search_fields = filter_fields
@action(detail=False, methods=['get', 'post'])
def new(self, request):
"""client api to upload result files, `example` as reference."""
valid_file_types = ['.png', '.jpg', '.jpeg', '.bmp', '.gif', '.txt', '.log', '.csv']
example = {
'run_id': 123,
'case_full_name': 'optional, if not provided your file name should contains test case name',
'file': 'your file stream'
}
if request.method == 'GET':
return Response(data={
'message': 'Please post to this API, see the example.',
'example': example
})
try:
run_id = int(request.data['run_id'])
case_name = request.data.get('case_full_name', None)
file = request.data['file']
file_ext = file.name.split('.')[-1]
assert '.' + file_ext in valid_file_types, 'Not allow such file type!'
file_data = {
'file': file,
'name': '/'.join([i for i in [case_name, file.name] if i]),
'file_byte_size': file.size,
'run': TestRun.objects.get(pk=run_id)
}
file_obj = ResultFile.objects.create(**file_data)
return Response(data={
'success': True,
'file': object_to_dict(file_obj)
})
except Exception as e:
return Response(data={
'success': False,
'message': error_detail(e),
'example': example
})
class ResetResultViewSet(viewsets.ModelViewSet):
queryset = ResetResult.objects.all()
serializer_class = ResetResultSerializer
filter_fields = ('id',)
search_fields = filter_fields
@action(detail=False)
def clear(self, request):
"""clear dead results and reset tasks, will be called async when user visit run detail page."""
pending_resets = ResetResult.objects.filter(reset_status__lt=2) # none, in progress
fixed = []
for result in pending_resets:
delta = datetime.now(timezone.utc) - result.reset_on
if delta.days > 1:
logger.info('abort reset result: {}'.format(result.id))
result.outcome, result.reset_status = 1, 3 # failed, failed
result.stdout = 'Reset task timeout.'
result.save()
fixed.append(result.id)
return Response(data=fixed)
@action(methods=['get', 'post'], detail=True)
def handler(self, request, pk=None):
"""
Handle single reset result.
1. update current reset result with provided info.
2. create error object if required.
3. update original result with latest outcome.
4. update original run to passed if all result passed
"""
if request.method == 'GET':
return self.retrieve(self, request, pk=pk)
current_reset = self.get_object()
assert isinstance(current_reset, ResetResult)
required_fields = ['outcome', 'duration', 'run_on', 'test_client', 'stdout']
optional_field = ['exception_type', 'message', 'stacktrace', 'stdout', 'stderr']
try:
for f in required_fields:
value = self.request.POST.get(f)
if f == 'stdout' and not value:
value = None
if value is None:
raise ValueError('Field "{}" is required!'.format(f))
if f == 'duration':
current_reset.duration = timedelta(seconds=float(value))
elif f == 'test_client':
current_reset.test_client = TestClient.objects.get(id=int(value))
else:
setattr(current_reset, f, value)
has_error = self.request.POST.get(optional_field[0], None)
if has_error:
error = ResultError() if not current_reset.error else current_reset.error
for f in optional_field:
value = self.request.POST.get(f, None)
setattr(error, f, value)
error.save()
current_reset.error = error
# update original result outcome to current reset outcome
current_reset.reset_status = 2 # done
current_reset.save()
current_reset.origin_result.outcome = current_reset.outcome
current_reset.origin_result.save()
# update original run status to passed if all passed
run = current_reset.origin_result.test_run
if run.result_total() == run.result_passed():
run.status = 0 # passed
run.save()
return Response(data={'message': 'Result has been saved.'})
except Exception as e:
logger.exception('Failed to handle reset result: {}'.format(pk))
current_reset.reset_status = 3 # failed
current_reset.save()
return Response(data={'message': str(e.args)}, status=400)
@action(detail=False, methods=['get', 'post'])
def new(self, request):
"""client api to reset a result, `example` as reference."""
example = {
'reset_id': 123,
'outcome': 'int, 0=passed, 1=failed, 2=skipped, 3=error, 5=pending',
'stdout': 'the log output',
'duration': 'float, in seconds',
'testcase': {
'full_name': 'long name, e.g. tests.login_tests.VerifyLoginFailed'
},
'test_client': {
'name': 'client name',
'ip': 'optional, default to current ip',
'platform': 'optional, client platform',
},
'error': {
'exception_type': 'optional, e.g, AssertError',
'message': 'optional, the message of exception',
'stacktrace': 'optional, the stack trace info'
}
}
if request.method == 'GET':
return Response(data={
'message': 'Please post to this API, see the example.',
'example': example
})
try:
reset_id = request.data.pop('reset_id')
case_data = request.data.pop('testcase')
client_data = request.data.pop('test_client')
error_data = request.data.pop('error', None)
if 'ip' not in client_data:
client_data['ip'] = get_ip(request)
reset_obj = ResetResult.objects.get(id=reset_id)
if reset_obj.origin_result.testcase.full_name != case_data['full_name']:
return Response(data={
'success': False,
'message': 'Testcase does not match current reset result!',
'example': example
})
client_obj, _ = TestClient.objects.update_or_create(name=client_data['name'], defaults=client_data)
if not client_obj.owner:
client_obj.owner = request.user.username
client_obj.save()
if error_data and error_data['exception_type']:
error_obj = ResultError.objects.create(**error_data)
reset_obj.error = error_obj
reset_obj.outcome = request.data['outcome']
reset_obj.stdout = request.data.get('stdout', None)
reset_obj.duration = timedelta(seconds=request.data['duration'])
reset_obj.run_on = datetime.now(timezone.utc)
reset_obj.test_client = client_obj
reset_obj.reset_status = 2 # done
reset_obj.save()
# update original result outcome to current reset outcome
reset_obj.origin_result.outcome = reset_obj.outcome
reset_obj.origin_result.save()
# update original run status to passed if all passed
run = reset_obj.origin_result.test_run
if run.result_passed() == run.result_total():
run.status = 0 # passed
run.save()
return Response(data={
'success': True,
'reset': object_to_dict(reset_obj)
})
except Exception as e:
return Response(data={
'success': False,
'message': error_detail(e),
'example': example
})