savkov/planchet

View on GitHub
app.py

Summary

Maintainability
A
3 hrs
Test Coverage
import json
import logging
import sys
from typing import List, Callable, Dict, Tuple, Union

from fastapi import FastAPI, HTTPException
from redis import Redis
from redis.exceptions import ConnectionError

from planchet.core import Job, COMPLETE, READ_ONLY, WRITE_ONLY, READ_WRITE
from planchet.config import (
    REDIS_HOST, REDIS_PORT, REDIS_PWD, MAX_PACKAGE_SIZE, MASTER_TOKEN
)
import planchet.io as io
import planchet.util as util

_fmt = '%(message)s'
logging.basicConfig(level=logging.DEBUG, format=_fmt)

app = FastAPI()

logging.info(util.blue('PLANCHET IS STARTING!'))

OUTPUT_REGISTRY = set()

logging.info(util.yellow(f'MASTER TOKEN: {MASTER_TOKEN}'))


def _add_token(job_name: str, ledger: Redis, token: str):
    if token is not None:
        ledger.set(f'TOKEN:{job_name}', token)


def _get_token(job_name: str, ledger: Redis) -> str:
    token = ledger.get(f'TOKEN:{job_name}')
    return token.decode('utf8') if token else token


def _remove_token(job_name: str, ledger: Redis):
    ledger.delete(f'TOKEN:{job_name}')


def _authenticate(job_name: str, ledger: Redis, token: str):
    # get the real token for this job
    real_token: str = _get_token(job_name, ledger)
    # is it the master token
    is_master: bool = MASTER_TOKEN is not None and token == MASTER_TOKEN
    # raise error if authentication is not successful
    if not(real_token is None or token == real_token or is_master):
        msg = 'Wrong or missing authentication token'
        raise HTTPException(status_code=403, detail=msg)


def _authenticate_master(token):
    if MASTER_TOKEN is not None and token != MASTER_TOKEN:
        msg = 'Wrong or missing master authentication token'
        raise HTTPException(status_code=403, detail=msg)


def _make_io(reader_name, writer_name, metadata):
    try:
        reader: Callable = getattr(io, reader_name)(metadata) \
            if reader_name else None
        writer: Callable = getattr(io, writer_name)(metadata) \
            if writer_name else None
    except FileNotFoundError as e:
        logging.error(e)
        raise HTTPException(status_code=400, detail=str(e))
    except PermissionError as e:
        logging.error(e)
        raise HTTPException(status_code=400, detail=str(e))
    return reader, writer


def _load_jobs(ledger) -> Dict:
    jobs: Dict = {}
    for job_key in ledger.scan_iter('JOB:*'):
        job_name: str = job_key.decode('utf8').split(':', 1)[1]
        try:
            jobs[job_name] = Job.restore_job(job_name, job_key, ledger)
            OUTPUT_REGISTRY.add(jobs[job_name].writer.file_path)
        except json.JSONDecodeError:
            logging.error(f'Could not restore job: {job_key}')
        except FileNotFoundError as e:
            logging.error(f'Could not restore job: {job_key}; {e}')
    return jobs


try:
    LEDGER: Redis = Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PWD)
    JOB_LOG: Dict = _load_jobs(LEDGER)
except ConnectionError:
    # There is no Redis connection; this fixes test imports
    # noinspection PyTypeChecker
    LEDGER = None  # type: ignore
    JON_LOG = {}
    logging.critical(
        util.redfill(f'Could not connect to redis at {REDIS_HOST}:{REDIS_PORT}'
                     f' using a password that was'
                     f'{" " if REDIS_PWD is None else " not"} None.'))


@app.post("/scramble")
def scramble(job_name: str, metadata: Dict, reader_name: str,
             writer_name: str, token: Union[str, None] = None,
             clean_start: bool = False,
             mode: str = READ_WRITE, cont: bool = False,
             force_overwrite: bool = False):
    """
    Start a new job.

    :param job_name: job name
    :param metadata: I/O classes configuration
    :param reader_name: class reader name
    :param writer_name: class writer name
    :param token: authentication token for the job; default no authentication
    :param clean_start: clean the items before you start
    :param mode: I/O mode
    :param cont: start a repair job
    :param force_overwrite: force overwrite of the
    """
    logging.info(util.pink(
        f'SCRAMBLING: name->{job_name}; metadata->{metadata}; '
        f'reader_name->{reader_name}; writer_name->{writer_name}; '
        f'clean_start->{clean_start}'))
    reader, writer = _make_io(reader_name, writer_name, metadata)

    # checking output file validity
    if not force_overwrite and writer and writer.file_path in OUTPUT_REGISTRY:
        msg = f'Output path not allowed `{writer.file_path}`.'
        raise HTTPException(status_code=400, detail=msg)

    # get job with that name from the ledger
    existing_job = LEDGER.get(f'JOB:{job_name}')

    # trying to re-create an existing job
    if existing_job and (not clean_start and not cont):
        msg = f'Job {job_name} already exists.'
        raise HTTPException(status_code=400, detail=msg)

    # continue where you left off if the job exists
    if existing_job and cont:
        job = JOB_LOG[job_name]
        # don't delete the output file only the records
        job.clean(output=False)
        del job
        del JOB_LOG[job_name]
    new_job: Job = Job(job_name, reader, writer, LEDGER, mode, cont)

    # clean ledger before starting
    if clean_start:
        new_job.restart()

    # log the new job
    JOB_LOG[job_name] = new_job
    if writer:
        OUTPUT_REGISTRY.add(writer.file_path)
    LEDGER.set(f'JOB:{job_name}', json.dumps({
        'metadata': metadata, 'reader_name': reader_name,
        'writer_name': writer_name, 'mode': mode}))
    _add_token(job_name, LEDGER, token)


@app.post("/serve")
def serve(job_name: str, batch_size: int = 100,
          token: Union[str, None] = None) -> List:
    """
    Serve a batch of items to the user.

    :param job_name: job name
    :param batch_size: number of items to be served in the batch
    :param token: authentication token; leave empty for no authentication
    :return: list of items of size `batch_size`
    """
    _authenticate(job_name, LEDGER, token)
    try:
        job = JOB_LOG[job_name]
    except KeyError:
        active = LEDGER.get(job_name)
        no_active_msg = f'No active job: {job_name}'
        no_known_msg = f'No known job: {job_name}'
        msg = no_active_msg if active else no_known_msg
        raise HTTPException(status_code=400, detail=msg)
    if job.mode == WRITE_ONLY:
        raise HTTPException(400, 'Trying to read from a write-only job')
    items = job.serve(batch_size)
    return items


@app.post("/receive")
def receive(job_name: str, items: List[Tuple[int, Union[Dict, List]]],
            overwrite: bool, token: Union[str, None] = None):
    """
    Receive a batch of processed items from the user.

    :param job_name: job name
    :param items: processed items
    :param overwrite: overwrite the output file
    :param token: authentication token; default no authentication
    """
    _authenticate(job_name, LEDGER, token)
    size = sys.getsizeof(items)
    if size > MAX_PACKAGE_SIZE:
        msg = f'In-memory payload must be less than {MAX_PACKAGE_SIZE}; ' \
              f'the current size is {size}.'
        logging.error(util.red(msg))
        raise HTTPException(status_code=413, detail=msg)
    job = JOB_LOG[job_name]
    if job.mode == READ_ONLY:
        raise HTTPException(400, 'Trying to send to a read-only job')
    if not job.writer:
        raise HTTPException(400, 'No valid writer initialised')
    job.receive(items, overwrite)
    if job.status == COMPLETE:
        LEDGER.set(f'JOB:{job_name}', COMPLETE)


@app.post("/mark-errors")
def mark_errors(job_name: str, ids: List[int], token: Union[str, None] = None):
    """
    Mark a list of items as errors based on the IDs in `ids`.

    :param job_name: job name
    :param ids: list of IDs
    :param token: authentication token; default no authentication
    """
    _authenticate(job_name, LEDGER, token)
    job = JOB_LOG[job_name]
    try:
        job.mark_errors(ids)
    except ValueError as e:
        raise HTTPException(400, str(e))


@app.get('/delete')
def delete(job_name: str, token: Union[str, None] = None):
    """
    Delete a job.

    :param job_name: job name
    :param token: authentication token; default no authentication
    """
    try:
        del JOB_LOG[job_name]
    except KeyError:
        logging.info(util.pink(f'Could not find a job named "{job_name}"'))
        pass
    LEDGER.delete(f'JOB:{job_name}')
    for record in list(LEDGER.scan_iter(f'{job_name}:*')):
        LEDGER.delete(record)


@app.get('/clean')
def clean(job_name: str, output: bool = True, token: Union[str, None] = None):
    """
    Remove all served but not received items from a job.

    :param job_name: job name
    :param output: clean output file(s) associated with the job
    :param token: authentication token; default no authentication
    """
    _authenticate(job_name, LEDGER, token)
    try:
        job = JOB_LOG[job_name]
    except KeyError:
        msg = f'Could not find a job name "{job_name}"'
        logging.info(util.pink(msg))
        raise HTTPException(400, msg)
    try:
        job.clean(output=output)
        OUTPUT_REGISTRY.discard(job.writer.file_path)
        _remove_token(job_name, LEDGER)
    except FileNotFoundError:
        msg = f'Could not find a output file for job "{job_name}"'
        logging.info(util.pink(msg))
        raise HTTPException(400, msg)
    except AttributeError:
        msg = f'No cleaning method found for writer type "{type(job.writer)}"'
        logging.info(util.pink(msg))
        raise HTTPException(400, msg)


@app.get('/purge')
def purge(output: bool = False, token: Union[str, None] = None):
    """
    Purge all jobs, items and optionally output files.

    :param output: purge output files if True
    :param token: authentication token; default no authentication
    """
    _authenticate_master(token)

    # remove all the logged jobs
    for name, job in list(JOB_LOG.items()):
        job.clean(output)
        del JOB_LOG[name]
        OUTPUT_REGISTRY.discard(job.writer.file_path)

    # nuke everything else
    for k in LEDGER.scan_iter('*'):
        LEDGER.delete(k)


@app.get("/report")
def report(job_name: str) -> Dict:
    """
    Serve a report for a job

    :param job_name: job name
    :return: report
    """
    try:
        return JOB_LOG[job_name].stats
    except KeyError:
        logging.info(util.pink(f'Could not find a job named "{job_name}"'))
        return {}


@app.get("/health")
def health_check() -> Dict:
    """
    Service health check. Healthy if a live ledger can be reached.

    :return: service status
    """
    try:
        LEDGER.ping()
        status = 'Online'
    except AttributeError:
        logging.critical(util.redfill('REDIS IS OFFLINE'))
        status = 'Offline'

    finished = [job for job in JOB_LOG.values() if job.status == COMPLETE]

    return {
        'Redis status': status,
        'Number of jobs': len(JOB_LOG),
        'Number of finished jobs': finished
    }