eregs/regulations-core

View on GitHub
regcore/db/django_models.py

Summary

Maintainability
F
3 days
Test Coverage
"""Each of the data structures relevant to the API (regulations, notices,
etc.), implemented using Django models"""
import collections

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist

from regcore.db import interface
from regcore.models import Diff, Document, Layer, Notice


def treeify(node, tree_id, pos=1, level=0):
    """Set tree properties in memory.
    """
    node['tree_id'] = tree_id
    node['level'] = level
    node['left'] = pos
    for child in node.get('children', []):
        pos = treeify(child, tree_id, pos=pos + 1, level=level + 1)
    pos = pos + 1
    node['right'] = pos
    return pos


def build_adjacency_map(regs):
    """Build mapping from node IDs to child records
    :param regs: List of `Document` records
    """
    ret = collections.defaultdict(list)
    for reg in regs:
        if reg.parent_id is not None:
            ret[reg.parent_id].append(reg)
    return ret


def build_id(reg, version=None):
    if version is not None:
        return '{0}:{1}'.format(version, '-'.join(reg['label']))
    return '-'.join(reg['label'])


class DMDocuments(interface.Documents):
    """Implementation of Django-models as regulations backend"""
    def get(self, doc_type, label, version=None):
        """Find the regulation label + version"""
        regs = Document.objects.filter(
            doc_type=doc_type,
            label_string=label,
            version=version,
        ).get_descendants(
            include_self=True,
        )
        regs = list(regs.all())
        if not regs:
            return None
        adjacency_map = build_adjacency_map(regs)
        return self._serialize(regs[0], adjacency_map)

    def _serialize(self, reg, adjacency_map):
        ret = {
            'label': reg.label_string.split('-'),
            'text': reg.text,
            'node_type': reg.node_type,
            'children': [
                self._serialize(child, adjacency_map)
                for child in adjacency_map.get(reg.id, [])
            ],
        }
        ret['lft'] = getattr(reg, 'lft', None)
        if reg.title:
            ret['title'] = reg.title
        return ret

    def _transform(self, reg, doc_type, version=None):
        """Create the Django object"""
        return Document(
            id=build_id(reg, version),
            doc_type=doc_type,
            version=version,
            parent_id=(
                build_id(reg['parent'], version)
                if reg.get('parent')
                else None
            ),
            tree_id=reg['tree_id'],
            level=reg['level'],
            lft=reg['left'],
            rght=reg['right'],
            label_string='-'.join(reg['label']),
            text=reg['text'],
            title=reg.get('title', ''),
            node_type=reg['node_type'],
            root=(len(reg['label']) == 1),
        )

    def bulk_delete(self, doc_type, root_label, version):
        """Delete all documents that match these params"""
        # This does not handle subparts. Ignoring that for now
        Document.objects.filter(
            version=version,
            doc_type=doc_type,
            label_string__startswith=root_label,
        ).delete()

    def bulk_insert(self, regs, doc_type, version):
        """Store all document objects"""
        treeify(regs[0], Document.objects._get_next_tree_id())
        Document.objects.bulk_create(
            [self._transform(r, doc_type, version) for r in regs],
            batch_size=settings.BATCH_SIZE)

    def listing(self, doc_type, label=None):
        """List regulation version-label pairs that match this label (or are
        root, if label is None)"""
        if label is None:
            query = Document.objects.filter(doc_type=doc_type, root=True)
        else:
            query = Document.objects.filter(
                doc_type=doc_type, label_string=label)

        query = query.only('version', 'label_string').order_by('version')
        # Flattens
        versions = [v for v in query.values_list('version', 'label_string')]
        return versions


class DMLayers(interface.Layers):
    """Implementation of Django-models as layers backend"""
    def _transform(self, layer, layer_name, doc_type):
        """Create a Django object"""
        layer = dict(layer)  # copy
        doc_id = layer.pop('doc_id')
        return Layer(name=layer_name, layer=layer, doc_type=doc_type,
                     doc_id=doc_id)

    def bulk_delete(self, layer_name, doc_type, root_doc_id):
        """Delete all layer data matching the parameters"""
        # This does not handle subparts; Ignoring that for now
        # @todo - use regex to avoid deleting 222-11 when replacing 22
        Layer.objects.filter(name=layer_name, doc_type=doc_type,
                             doc_id__startswith=root_doc_id).delete()

    def bulk_insert(self, layers, layer_name, doc_type):
        """Store all layer objects"""
        Layer.objects.bulk_create(
            [self._transform(l, layer_name, doc_type) for l in layers],
            batch_size=settings.BATCH_SIZE)

    def get(self, name, doc_type, doc_id):
        """Find the layer that matches these parameters"""
        try:
            layer = Layer.objects.get(name=name, doc_type=doc_type,
                                      doc_id=doc_id)
            return layer.layer
        except ObjectDoesNotExist:
            return None


class DMNotices(interface.Notices):
    """Implementation of Django-models as notice backend"""
    def delete(self, doc_number):
        Notice.objects.filter(document_number=doc_number).delete()

    def insert(self, doc_number, notice):
        """Store a single notice"""
        model = Notice(document_number=doc_number,
                       fr_url=notice['fr_url'],
                       publication_date=notice['publication_date'],
                       notice=notice)
        if 'effective_on' in notice:
            model.effective_on = notice['effective_on']
        model.save()
        for cfr_part in notice.get('cfr_parts', []):
            model.noticecfrpart_set.create(cfr_part=cfr_part)

    def get(self, doc_number):
        """Find the associated notice"""
        try:
            return Notice.objects.get(
                document_number=doc_number).notice
        except ObjectDoesNotExist:
            return None

    def listing(self, part=None):
        """All notices or filtered by cfr_part"""
        query = Notice.objects
        if part:
            query = query.filter(noticecfrpart__cfr_part=part)
        results = query.values('document_number', 'effective_on', 'fr_url',
                               'publication_date')
        for result in results:
            for key in ('effective_on', 'publication_date'):
                if result[key]:
                    result[key] = result[key].isoformat()
                else:
                    del result[key]
        return list(results)  # maintain compatibility with other backends


class DMDiffs(interface.Diffs):
    """Implementation of Django-models as diff backend"""
    def insert(self, label, old_version, new_version, diff):
        """Store a diff between two versions of a regulation node"""
        Diff(label=label, old_version=old_version, new_version=new_version,
             diff=diff).save()

    def delete(self, label, old_version, new_version):
        Diff.objects.filter(label=label, old_version=old_version,
                            new_version=new_version).delete()

    def get(self, label, old_version, new_version):
        """Find the associated diff"""
        try:
            diff = Diff.objects.get(label=label, old_version=old_version,
                                    new_version=new_version)
            return diff.diff
        except ObjectDoesNotExist:
            return None