savkov/planchet

View on GitHub
planchet/client.py

Summary

Maintainability
A
1 hr
Test Coverage
import json
import logging
from typing import Dict, List, Union, Tuple

from requests import Response

from .util import requests_retry_session


_fmt = '%(message)s'
logging.basicConfig(level=logging.DEBUG, format=_fmt)


class PlanchetClient:
    """ The PlanchetClient object provides an easy connectivity to a Planchet
    instance. It is essentially a convenience wrapper around the
    `requests library <https://requests.readthedocs.io/en/master/>`_.

    :param url: Planchet URL, e.g. `<http://localhost:5005>`_

    Attributes:
        RETRIES:    Default number of retries for requests.
    """

    RETRIES = 5

    def __init__(self, url):
        self.url = url if url.endswith('/') else url + '/'

    def start_job(self, job_name: str, metadata: Dict, reader_name: str,
                  writer_name: str, clean_start: bool = False,
                  token: Union[str, None] = None,
                  retries: int = 1, mode: str = 'read-write',
                  cont: bool = False) -> Response:
        """
        Starts a job.

        Some jobs that you could run are:

        **Regular job:** use `job_name`, `metadata`, `reader_name` &
        `writer_name`.

        **Repair job:** like regular but set `cont` to True.


        :param job_name: name of the job
        :param metadata: metadata for the reader and writer classes, typically
           including the input/output file paths and others.
        :param reader_name: name of the reader class, e.g. `CsvReader`.
        :param writer_name: name of the writer class, e.g. `CsvWriter`.
        :param clean_start: cleans all items in the ledger (Redis) before
           starting the job.
        :param token: authentication token; no authentication if empty
        :param retries: number of time to retry this request
        :param mode: io mode; possible values are `read`, `write`, and
           the default `read-write`
        :param cont: makes the job a repair job resetting the reader iterator
           and cleaning all served by not received items.
        :return: the server response
        """
        params = {
            'job_name': job_name,
            'reader_name': reader_name,
            'writer_name': writer_name,
            'mode': mode,
            'clean_start': clean_start,
            'cont': cont
        }
        if token is not None:
            params['token'] = token
        url = self.make_param_url('scramble', params)
        session = requests_retry_session(retries=retries)
        return session.post(url=url, json=metadata)

    def delete_job(self, job_name: str, token: Union[str, None] = None,
                   retries: int = RETRIES) -> Response:
        """
        Deletes all references to a job, including the job metadata and the
        items associated with it.

        :param job_name: job name
        :param token: authentication token; no authentication if empty
        :param retries: number of retries for this request
        :return: the server response
        """
        session = requests_retry_session(retries=retries)
        params = {'job_name': job_name}
        if token is not None:
            params['token'] = token
        url = self.make_param_url('delete', params)
        return session.get(url=url)

    def clean_job(self, job_name: str, token: Union[str, None] = None,
                  retries: int = RETRIES) -> Response:
        """
        Remove all items associated with a job

        :param job_name: job name
        :param token: authentication token; no authentication if empty
        :param retries: number of retries for this request
        :return: the server response
        """
        session = requests_retry_session(retries=retries)
        params = {'job_name': job_name}
        if token is not None:
            params['token'] = token
        url = self.make_param_url('clean', params)
        return session.get(url=url)

    def purge_server(self, master_token: str, output: bool = True,
                     retries: int = RETRIES) -> Response:
        """
        Remove all jobs, items and optionally delete all output files from
        the server.

        :param master_token: master authentication token for the server
        :param output: deletes output file if true
        :param retries: number of retries for this request
        :return: the server response
        """
        session = requests_retry_session(retries=retries)
        params = {'token': master_token, 'output': output}
        url = self.make_param_url('purge', params)
        return session.get(url=url)

    def get_job_report(self, job_name: str, retries: int = RETRIES
                       ) -> Response:
        """
        Request the job report from Planchet.

        Example report:

        .. code-block:: python

           {
              'served': 20,
              'received': 20,
              'status': 'IN_PROGRESS'
           }


        :param job_name: job name
        :param retries: number of retries for this request
        :return: the server response
        """
        session = requests_retry_session(retries=retries)
        response = session.get(url=f'{self.url}report?job_name={job_name}')
        if response.status_code == 200:
            return json.loads(response.text)

    def get(self, job_name: str, n_items: int,
            token: Union[str, None] = None,
            retries: int = RETRIES) -> List:
        """
        Request a batch of items from `job_name`.

        :param job_name: job name
        :param n_items: number of items in the batch
        :param token: authentication token; no authentication if empty
        :param retries: number of retries for this request
        :return: the server response
        """
        session = requests_retry_session(retries=retries)
        params = {'job_name': job_name, 'batch_size': n_items}
        if token is not None:
            params['token'] = token
        url = self.make_param_url('serve', params)
        response = session.post(url=url)
        if response.status_code == 200:
            return json.loads(response.text)

    def send(self, job_name: str, items: List[Tuple[int, Union[Dict, List]]],
             token: Union[str, None] = None,
             overwrite: bool = False, retries: int = RETRIES) -> Response:
        """
        Send a batch of processed items from `job_name` to Planchet.

        :param job_name: job name
        :param items: processed items
        :param token: authentication token; no authentication if empty
        :param overwrite: overwrite the output file
        :param retries: number of retries for this request
        :return: the server response
        """
        session = requests_retry_session(retries=retries)
        if overwrite:
            logging.warning('The overwrite parameter is discouraged and will '
                            'be removed in the next major release.')
        params = {'job_name': job_name, 'overwrite': overwrite}
        if token is not None:
            params['token'] = token
        url = self.make_param_url('receive', params)
        return session.post(url=url, json=items)

    def mark_errors(self, job_name: str, ids: List[int],
                    token: Union[str, None] = None,
                    retries: int = RETRIES):
        """
        Mark a list of item IDs as errors.

        :param job_name: job name
        :param ids: list of item IDs
        :param token: authentication token; no authentication if empty
        :param retries: number of retries for this request
        :return: the server response
        """
        session = requests_retry_session(retries=retries)
        params = {'job_name': job_name}
        if token is not None:
            params['token'] = token
        url = self.make_param_url('mark-errors', params)
        return session.post(url=url, json=ids)

    def check(self, retries: int = RETRIES) -> Response:
        """
        Check if Planchet is healthy.

        :param retries: number of retries for this request
        :return: the server response
        """
        session = requests_retry_session(retries=retries)
        response = session.get(url=f'{self.url}health')
        if response.status_code == 200:
            return json.loads(response.text)

    def make_param_url(self, endpoint, params):
        params_str = '&'.join(f'{k}={v}' for k, v in params.items())
        return f'{self.url}{endpoint}?{params_str}'