savkov/planchet

View on GitHub
planchet/core.py

Summary

Maintainability
A
1 hr
Test Coverage
import json
import logging
from typing import Callable, List, Dict, Union, Tuple

from redis import Redis

from planchet import io

_fmt = '%(message)s'
logging.basicConfig(level=logging.DEBUG, format=_fmt)

SERVED = 'SERVED'
RECEIVED = 'RECEIVED'
ERROR = 'ERROR'

IN_PROGRESS = 'IN_PROGRESS'
COMPLETE = 'COMPLETE'

READ_ONLY = 'read'
WRITE_ONLY = 'write'
READ_WRITE = 'read-write'


class Job:
    """
    Create a new Job object.

    :param name: job name
    :param reader: reader object
    :param writer: writer object
    :param ledger: ledger object
    :param mode: writing mode
    :param cont: make a repair job if True
    """
    def __init__(self, name: str, reader: Callable, writer: Callable,
                 ledger: Redis, mode: str = READ_WRITE,
                 cont: bool = False):
        self.name = name
        self.reader = reader
        self.writer = writer
        self.ledger = ledger
        self.mode = mode
        self.served = set()
        self.received = set()
        self.exhausted = False
        self.cont = cont
        self.restore_records(self)

    def serve(self, n_items: int) -> List:
        """
        Send `n_items` to the user.

        :param n_items: number of items served
        :return: list of items of requested size
        """
        items: List = []
        while len(items) < n_items:
            bs = n_items - len(items)
            buff = self.reader(bs)
            for id_, item in buff:
                status = self.ledger.get(self.ledger_id(id_))
                if not bool(status) or (
                        self.cont and
                        status and
                        status.decode('utf8') == SERVED
                ):
                    items.append((id_, item))
                    self.ledger.set(self.ledger_id(id_), SERVED)
                    self.served.add(id_)
            if not buff:
                self.exhausted = True
                break
        return items

    def receive(self, items: List[Tuple[int, Union[Dict, List]]],
                overwrite: bool):
        """
        Receive a list of processed items from a job, write them to output and
        mark them as received.

        :param items: processed items
        :param overwrite: overwrite the output file
        """
        ids = []
        data = []
        for id_, item in items:
            # This will skip writing data for records that have been written
            # already based on the id's in the ledger. This does not apply to
            # dumping jobs.
            if self.mode == READ_WRITE and not overwrite and \
                    self.ledger.get(
                        self.ledger_id(id_)).decode('utf8') == RECEIVED:
                continue
            ids.append(id_)
            data.append(item)
        self.writer(data)
        for id_ in ids:
            self.received.add(id_)
            self.served.discard(id_)
            self.ledger.set(self.ledger_id(id_), RECEIVED)

    def mark_errors(self, ids):
        """
        Mark the items with IDs in `ids` as errors.

        :param ids: IDs of items to be marked as errors
        """
        for id_ in ids:
            value = self.ledger.get(self.ledger_id(id_))
            if value and value.decode('utf8') == RECEIVED:
                logging.error(f'Attempting to mark a received item: {id_}')
                raise ValueError(f'Item already received: {id_}')
            self.ledger.set(self.ledger_id(id_), ERROR)

    def restart(self):
        """
        Restart the job. The ledger is wiped, all items in this object are
        cleaned and the job is set to not exhausted.
        """
        for key in list(self.ledger.scan_iter(f'{self.name}:*')):
            self.ledger.delete(key)
        self.served = set()
        self.received = set()
        self.exhausted = False

    def clean(self, output: bool = True):
        """
        Cleans the job. All served but not received items are cleaned from the
        ledger.

        :param output: remove the output file for this job
        """
        served = []
        q_string = f'{self.name}:*'
        for i, key in enumerate(self.ledger.scan_iter(q_string)):
            value = self.ledger.get(key).decode('utf8')
            if value == SERVED:
                served.append(key)
        for key in served:
            self.ledger.delete(key)
            _, id_ = key.decode('utf8').split(':', 1)
            self.served.discard(int(id_))
        if output:
            self.writer.clean()

    @property
    def status(self):
        """
        Returns the status of this job: ``COMPLETE`` or ``IN_PROGRESS``.

        :return: job status
        """
        if self.exhausted and not self.served:
            return COMPLETE
        else:
            return IN_PROGRESS

    @property
    def stats(self):
        """
        Returns the report of this job: # items served and received, as well as
        if the job is done.

        :return: job report
        """
        return {
            'served': len(self.served),
            'received': len(self.received),
            'status': self.status
        }

    def ledger_id(self, id_: Union[str, int]) -> str:
        return f'{self.name}:{id_}'

    @staticmethod
    def restore_records(job):
        keys = job.ledger.scan_iter(f'{job.name}:*')
        for key in keys:
            id_ = int(key.decode('utf8').split(':', 1)[1])
            value = job.ledger.get(key).decode('utf8')
            if value == SERVED:
                job.served.add(id_)
            elif value == RECEIVED:
                job.received.add(id_)

    @staticmethod
    def restore_job(job_name: str, job_key: str, ledger: Redis):
        record = ledger.get(job_key)
        if not record:
            return
        record = json.loads(record.decode('utf8'))
        reader_name = record['reader_name']
        writer_name = record['writer_name']
        metadata = record['metadata']
        reader: Callable = get_io_object(reader_name, metadata)
        writer: Callable = get_io_object(writer_name, metadata)
        mode: str = record['mode']
        job: Job = Job(job_name, reader, writer, ledger, mode)
        return job


def get_io_object(name, metadata):
    try:
        return getattr(io, name)(metadata)
    except AttributeError:
        return None