swimlane/swimlane-python

View on GitHub
swimlane/core/resources/record.py

Summary

Maintainability
D
1 day
Test Coverage
import copy
from functools import total_ordering
import time
import pendulum
import six
from swimlane.core.resources.base import APIResource
from swimlane.core.resources.usergroup import UserGroup, User
from swimlane.exceptions import SwimlaneException, UnknownField, ValidationError
import swimlane.core.adapters.task  # avoid circular reference
import swimlane.core.adapters.helper  # avoid circular reference



@total_ordering
class Record(APIResource):
    """A single Swimlane Record instance

    Attributes:
        id (str): Full Record ID
        tracking_id (str): Record tracking ID
        created (pendulum.DateTime): Pendulum datetime for Record created date
        modified (pendulum.DateTime): Pendulum datetime for Record last modified date
        is_new (bool): True if Record does not yet exist on server. Other values may be temporarily None if True
        app (App): App instance that Record belongs to
    """

    _type = 'Core.Models.Record.Record, Core'

    def __init__(self, app, raw):
        super(Record, self).__init__(app._swimlane, raw)

        self.__app = app

        self.is_new = self._raw.get('isNew', False)

        # Protect against creation from generic raw data not yet containing server-generated values
        if self.is_new:
            self.id = self.tracking_id = self.created = self.modified = None
        else:
            record_app_id = raw['applicationId']
            if record_app_id != app.id:
                raise ValueError('Record applicationId `{}` does not match source app id `{}`'.format(
                    record_app_id,
                    app.id
                ))

            self.id = self._raw['id']

            # Combine app acronym + trackingId instead of using trackingFull raw
            # for guaranteed value (not available through report results)
            self.tracking_id = '-'.join([
                self.app.acronym,
                str(int(self._raw['trackingId']))
            ])

            self.created = pendulum.parse(self._raw['createdDate'])
            self.modified = pendulum.parse(self._raw['modifiedDate'])

        self.__allowed = []

        self._fields = {}
        self.__premap_fields()

        # Get trackingFull if available
        if app.tracking_id in self._raw['values']:
            self._raw['trackingFull'] = self._raw['values'].get(app.tracking_id)

        self.__existing_values = {k: self.get_field(k).get_batch_representation() for (k, v) in self}
        self._comments_modified = False

        self.locked = False
        self.locking_user = None
        self.locked_date = None

        # avoid circular reference
        from swimlane.core.adapters import RecordRevisionAdapter
        self.revisions = RecordRevisionAdapter(app, self)

    @property
    def app(self):
        return self.__app

    def __str__(self):
        if self.is_new:
            return '{} - New'.format(self.app.acronym)

        return str(self.tracking_id)

    def __setitem__(self, field_name, value):
        self.get_field(field_name).set_python(value)

    def __getitem__(self, field_name):
        return self.get_field(field_name).get_item()

    def __delitem__(self, field_name):
        self[field_name] = None

    def __iter__(self):
        for field_name, field in six.iteritems(self._fields):
            yield field_name, field.get_python()

    def __hash__(self):
        return hash((self.id, self.app))

    def __lt__(self, other):
        if not isinstance(other, self.__class__):
            raise TypeError("Comparisons not supported between instances of '{}' and '{}'".format(
                other.__class__.__name__,
                self.__class__.__name__
            ))

        tracking_number_self = int(self.tracking_id.split('-')[1])
        tracking_number_other = int(other.tracking_id.split('-')[1])

        return (self.app.name, tracking_number_self) < (other.app.name, tracking_number_other)

    def __premap_fields(self):
        """Build field instances using field definitions in app manifest

        Map raw record field data into appropriate field instances with their correct respective types
        """
        # Circular imports
        from swimlane.core.fields import resolve_field_class

        for field_definition in self.app._raw['fields']:
            field_class = resolve_field_class(field_definition)

            field_instance = field_class(field_definition['name'], self)
            value = self._raw['values'].get(field_instance.id)
            field_instance.set_swimlane(value)

            self._fields[field_instance.name] = field_instance

    def get_cache_index_keys(self):
        """Return values available for retrieving records, but only for already existing records"""
        if not (self.id and self.tracking_id):
            raise NotImplementedError

        return {
            'id': self.id,
            'tracking_id': self.tracking_id
        }

    def get_field(self, field_name):
        """Get field instance used to get, set, and serialize internal field value

        Args:
            field_name (str): Field name or key to retrieve

        Returns:
            Field: Requested field instance

        Raises:
             UnknownField: Raised if `field_name` not found in parent App
        """
        try:
            return self._fields[self.app.resolve_field_name(field_name)]
        except KeyError:
            raise UnknownField(self.app, field_name, self._fields.keys())

    def validate(self):
        """Explicitly validate field data

        Notes:
            Called automatically during save call before sending data to server

        Raises:
             ValidationError: If any fields fail validation
        """
        for field in (_field for _field in six.itervalues(self._fields) if _field.required):
            if field.get_swimlane() is None:
                raise ValidationError(
                    self, 'Required field "{}" is not set'.format(field.name))

    def __request_and_reinitialize(self, method, endpoint, data):
        response = self._swimlane.request(
            method,
            endpoint,
            json=data
        )

        # Reinitialize record with new raw content returned from server to update any calculated fields
        self.__init__(self.app, response.json())

        # Manually cache self after save to keep cache updated with latest data
        self._swimlane.resources_cache.cache(self)

    def save(self):
        """Persist record changes on Swimlane server

        Updates internal raw data with response content from server to guarantee calculated field values match values on
        server

        Raises:
            ValidationError: If any fields fail validation
        """

        if self.is_new:
            method = 'post'
        else:
            method = 'put'

        # Pop off fields with None value to allow for saving empty fields
        copy_raw = copy.copy(self._raw)
        values_dict = {}
        for key, value in six.iteritems(copy_raw['values']):
            if value is not None:
                values_dict[key] = value
        copy_raw['values'] = values_dict

        self.validate()

        self.__request_and_reinitialize(
            method,
            'app/{}/record'.format(self.app.id),
            copy_raw
        )

    def patch(self):
        """Patch record on Swimlane server

        Raises
            ValueError: If record.is_new, or if comments or attachments are attempted to be patched
        """
        if self.is_new:
            raise ValueError('Cannot patch a new Record')
        elif self._comments_modified:
            raise ValueError('Can not patch with added comments')

        copy_raw = copy.copy(self._raw)

        pending_values = {k: self.get_field(k).get_batch_representation() for (k, v) in self}
        patch_values = {
            self.get_field(k).id: pending_values[k] for k in set(pending_values) & set(self.__existing_values)
            if pending_values[k] != self.__existing_values[k]
        }

        for field_id, value in six.iteritems(patch_values):
            #
            if self.app.get_field_definition_by_id(field_id)['fieldType'] == 'attachment':
                raise ValueError('Can not patch new attachments')
            # Use None for empty arrays to ensure field is removed from Record on PATCH
            if not value:
                patch_values[field_id] = None

        # $type needed here for dotnet to deserialize correctly
        patch_values['$type'] = self._raw['values']['$type']
        copy_raw['values'] = patch_values

        self.validate()

        self.__request_and_reinitialize(
            'patch',
            'app/{}/record/{}'.format(self.app.id, self.id),
            copy_raw
        )

    def delete(self):
        """Delete record from Swimlane server

        .. versionadded:: 2.16.1

        Resets to new state, but leaves field data as-is. Saving a deleted record will create a new Swimlane record

        Raises
            ValueError: If record.is_new
        """
        if self.is_new:
            raise ValueError('Cannot delete a new Record')

        self._swimlane.request(
            'delete',
            'app/{}/record/{}'.format(self.app.id, self.id)
        )

        del self._swimlane.resources_cache[self]

        # Modify current raw values indicating an unsaved record but persisting field data
        raw = copy.deepcopy(self._raw)
        raw['id'] = None
        raw['isNew'] = True

        self.__init__(self.app, raw)

    def for_json(self, *field_names):
        """Returns json.dump()-compatible dict representation of the record

        .. versionadded:: 4.1

        Useful for resolving any Cursor, datetime/Pendulum, etc. field values to useful formats outside of Python

        Args:
            *field_names (str): Optional subset of field(s) to include in returned dict. Defaults to all fields

        Raises:
             UnknownField: Raised if any of `field_names` not found in parent App

        Returns:
            dict: field names -> JSON compatible field values
        """
        field_names = field_names or self._fields.keys()

        return {field_name: self.get_field(field_name).for_json() for field_name in field_names}

    @property
    def restrictions(self):
        """Returns cached set of retrieved UserGroups in the record's list of allowed accounts"""
        return [UserGroup(self._swimlane, raw) for raw in self._raw['allowed']]

    def add_restriction(self, *usergroups):
        """Add UserGroup(s) to list of accounts with access to record

        .. versionadded:: 2.16.1

        UserGroups already in the restricted list can be added multiple times and duplicates will be ignored

        Notes:

        Args:
            *usergroups (UserGroup): 1 or more Swimlane UserGroup(s) to add to restriction list

        Raises:
            TypeError: If 0 UserGroups provided or provided a non-UserGroup instance
        """
        if not usergroups:
            raise TypeError(
                'Must provide at least one UserGroup for restriction')

        allowed = copy.copy(self._raw.get('allowed', []))

        for usergroup in usergroups:
            if not isinstance(usergroup, UserGroup):
                raise TypeError(
                    'Expected UserGroup, received `{}` instead'.format(usergroup))

            selection = usergroup.as_usergroup_selection()
            if selection not in allowed:
                allowed.append(selection)

        self.validate()
        self._swimlane.request(
            'put',
            'app/{}/record/{}/restrict'.format(self.app.id, self.id),
            json=allowed
        )

        self._raw['allowed'] = allowed

    def remove_restriction(self, *usergroups):
        """Remove UserGroup(s) from list of accounts with access to record

        .. versionadded:: 2.16.1

        Notes:

        Warnings:
            Providing no UserGroups will clear the restriction list, opening access to ALL accounts

        Args:
            *usergroups (UserGroup): 0 or more Swimlane UserGroup(s) to remove from restriction list

        Raises:
            TypeError: If provided a non-UserGroup instance
            ValueError: If provided UserGroup not in current restriction list
        """
        if usergroups:
            allowed = copy.copy(self._raw.get('allowed', []))

            for usergroup in usergroups:
                if not isinstance(usergroup, UserGroup):
                    raise TypeError(
                        'Expected UserGroup, received `{}` instead'.format(usergroup))
                try:
                    allowed.remove(usergroup.as_usergroup_selection())
                except ValueError:
                    raise ValueError(
                        'UserGroup `{}` not in record `{}` restriction list'.format(usergroup, self))
        else:
            allowed = []

        self.validate()
        self._swimlane.request(
            'put',
            'app/{}/record/{}/restrict'.format(self.app.id, self.id),
            json=allowed
        )

        self._raw['allowed'] = allowed

    def lock(self):
        """
        Lock the record to the Current User.


        Notes:

        Warnings:

        Args:

        """
        self.validate()
        response = self._swimlane.request(
            'post',
            'app/{}/record/{}/lock'.format(
                self.app.id, self.id)
        ).json()
        self.locked = True
        self.locking_user = User(self._swimlane, response['lockingUser'])
        self.locked_date = response['lockedDate']

    def unlock(self):
          """
          Unlock the record.


          Notes:

          Warnings:

          Args:

          """
          self.validate()
          self._swimlane.request(
              'post',
              'app/{}/record/{}/unlock'.format(
                  self.app.id, self.id)
          ).json()
          self.locked = False 
          self.locking_user = None
          self.locked_date = None
    
    def execute_task(self, task_name, timeout=int(20)):
        job_info = swimlane.core.adapters.task.TaskAdapter(self.app._swimlane).execute(task_name, self._raw)
        timeout_start = pendulum.now()
        while pendulum.now() < timeout_start.add(seconds=timeout):
            status = self.app._swimlane.helpers.check_bulk_job_status(job_info.text)
            if len(status):
                for item in status:
                    if item.get('status') == 'completed':
                        self.__request_and_reinitialize(
                            'get', '/app/{appId}/record/{id}'.format(appId=self.app.id, id=self.id), None)
                        timeout = 0
                    if item.get('status') == 'failed':
                        raise SwimlaneException('Task failed: {}'.format(item.get('message')))
            time.sleep(1)
        

def record_factory(app, fields=None):
    """Return a temporary Record instance to be used for field validation and value parsing

    Args:
        app (App): Target App to create a transient Record instance for
        fields (dict): Optional dict of fields and values to set on new Record instance before returning

    Returns:
        Record: Unsaved Record instance to be used for validation, creation, etc.
    """
    # pylint: disable=line-too-long
    record = Record(app, {
        '$type': Record._type,
        'isNew': True,
        'applicationId': app.id,
        'comments': {
            '$type': 'System.Collections.Generic.Dictionary`2[[System.String, mscorlib],[System.Collections.Generic.List`1[[Core.Models.Record.Comments, Core]], mscorlib]], mscorlib'
        },
        'values': {
            '$type': 'System.Collections.Generic.Dictionary`2[[System.String, mscorlib],[System.Object, mscorlib]], mscorlib'
        }
    })

    fields = fields or {}

    # Apply Default Values
    for name, value in six.iteritems(app._defaults):
        record[name] = value

    # Apply Provided Field Values
    for name, value in six.iteritems(fields):
        record[name] = value

    # Pop off fields with None value to allow for saving empty fields
    copy_raw = copy.copy(record._raw)
    values_dict = {}
    for key, value in six.iteritems(copy_raw['values']):
        if value is not None:
            values_dict[key] = value
    record._raw['values'] = values_dict

    return record