CenterForOpenScience/scrapi

View on GitHub
scrapi/processing/postgres.py

Summary

Maintainability
A
3 hrs
Test Coverage
from __future__ import absolute_import

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.api.settings")

import copy
import logging

import django

from api.webview.models import HarvesterResponse, Document, Version

from scrapi import events
from scrapi.util import json_without_bytes
from scrapi.linter import RawDocument, NormalizedDocument
from scrapi.processing import DocumentTuple
from scrapi.processing.base import BaseProcessor, BaseHarvesterResponse, BaseDatabaseManager

django.setup()
logger = logging.getLogger(__name__)


class DatabaseManager(BaseDatabaseManager):
    '''All database management is performed by django'''

    def setup(self):
        return True

    def tear_down(self):
        pass

    def clear(self, force=False):
        pass

    def celery_setup(self, *args, **kwargs):
        pass


def paginated(query, page_size=10):
    for offset in range(0, query.count(), page_size):
        for doc in query[offset:offset + page_size]:
            yield doc


class PostgresProcessor(BaseProcessor):
    NAME = 'postgres'

    manager = DatabaseManager()

    def documents(self, *sources):
        q = Document.objects.all()
        querysets = (q.filter(source=source) for source in sources) if sources else [q]
        for query in querysets:
            for doc in paginated(query):
                try:
                    raw = RawDocument(doc.raw, clean=False, validate=False)
                except AttributeError as e:
                    logger.info('{}  -- Malformed rawdoc in database, skipping'.format(e))
                    raw = None
                    continue
                normalized = NormalizedDocument(doc.normalized, validate=False, clean=False) if doc.normalized else None
                yield DocumentTuple(raw, normalized)

    def get(self, source, docID):
        try:
            document = Document.objects.get(source=source, docID=docID)
        except Document.DoesNotExist:
            return None
        raw = RawDocument(document.raw, clean=False, validate=False)
        normalized = NormalizedDocument(document.normalized, validate=False, clean=False) if document.normalized else None

        return DocumentTuple(raw, normalized)

    def delete(self, source, docID):
        doc = Document.objects.get(source=source, docID=docID)
        doc.delete()

    def create(self, attributes):
        attributes = json_without_bytes(attributes)
        Document.objects.create(
            source=attributes['source'],
            docID=attributes['docID'],
            providerUpdatedDateTime=None,
            raw=attributes,
            normalized=None
        ).save()

    @property
    def HarvesterResponseModel(self):
        return HarvesterResponseModel

    @events.logged(events.PROCESSING, 'raw.postgres')
    def process_raw(self, raw_doc):
        document = self.version(raw=raw_doc)
        timestamps = raw_doc.get('timestamps')
        modified_doc = copy.deepcopy(raw_doc.attributes)

        document.raw = modified_doc
        document.timestamps = timestamps

        document.save()

    @events.logged(events.PROCESSING, 'normalized.postgres')
    def process_normalized(self, raw_doc, normalized):
        document = self.version(raw=raw_doc, normalized=normalized)
        timestamps = raw_doc.get('timestamps') or normalized.get('timestamps')

        document.raw = raw_doc.attributes
        document.timestamps = timestamps
        document.normalized = normalized.attributes
        document.providerUpdatedDateTime = normalized['providerUpdatedDateTime']

        document.save()

    def _get_by_source_id(self, source, docID):
        try:
            return Document.objects.get(key=Document._make_key(source, docID))
        except Document.DoesNotExist:
            return None

    def version(self, raw=None, normalized=None):
        old_doc = self._get_by_source_id(raw['source'], raw['docID'])
        if old_doc:
            raw_changed = raw and self.different(raw.attributes, old_doc.raw)
            norm_changed = normalized and self.different(normalized.attributes, old_doc.normalized)
            version = Version(
                key=old_doc,
                source=old_doc.source,
                docID=old_doc.docID,
                providerUpdatedDateTime=old_doc.providerUpdatedDateTime,
                raw=old_doc.raw,
                normalized=old_doc.normalized,
                status=old_doc.status,
                timestamps=old_doc.timestamps
            )
            if raw_changed or norm_changed:
                version.save()
            return old_doc
        else:
            return Document.objects.create(source=raw['source'], docID=raw['docID'])

    def get_versions(self, source, docID):
        doc = self._get_by_source_id(source, docID)
        for version in doc.version_set.all().order_by('id'):
            yield DocumentTuple(
                RawDocument(version.raw, clean=False, validate=False),
                NormalizedDocument(version.normalized, clean=False, validate=False)
            )
        yield DocumentTuple(
            RawDocument(doc.raw, clean=False, validate=False),
            NormalizedDocument(doc.normalized, clean=False, validate=False)
        )


class HarvesterResponseModel(BaseHarvesterResponse):

    response = None

    def __init__(self, *args, **kwargs):
        if kwargs:
            key = kwargs['method'].lower() + kwargs['url'].lower()
            self.response = HarvesterResponse(key=key, *args, **kwargs)
        else:
            self.response = args[0]

    @property
    def method(self):
        return str(self.response.method)

    @property
    def url(self):
        return str(self.response.url)

    @property
    def ok(self):
        return bool(self.response.ok)

    @property
    def content(self):
        if isinstance(self.response.content, memoryview):
            return self.response.content.tobytes()
        if isinstance(self.response.content, bytes):
            return self.response.content
        return str(self.response.content)

    @property
    def encoding(self):
        return str(self.response.encoding)

    @property
    def headers_str(self):
        return str(self.response.headers_str)

    @property
    def status_code(self):
        return int(self.response.status_code)

    @property
    def time_made(self):
        return str(self.response.time_made)

    def save(self, *args, **kwargs):
        self.response.save()
        return self

    def update(self, **kwargs):
        for k, v in kwargs.items():
            setattr(self.response, k, v)
        return self.save()

    @classmethod
    def get(cls, url=None, method=None):
        key = method.lower() + url.lower()
        try:
            return cls(HarvesterResponse.objects.get(key=key))
        except HarvesterResponse.DoesNotExist:
            raise cls.DoesNotExist