HumanCellAtlas/ingest-archiver

View on GitHub
archiver/usi_api.py

Summary

Maintainability
A
3 hrs
Test Coverage
import json
import requests
import logging
import config

import requests.packages.urllib3.util.retry as retry

from ingest.utils.token_manager import TokenManager


class AAPTokenClient:
    def __init__(self, url=None, username=None, password=None):
        self.url = url if url else config.AAP_API_URL
        self.username = username if username else config.AAP_API_USER
        self.password = password if password else config.AAP_API_PASSWORD

    def retrieve_token(self):
        token = ''
        response = requests.get(self.url, auth=(self.username, self.password))
        if response.ok:
            token = response.text
        return token


USI_ENTITY_LINK = {
    'study': 'enaStudies',
    'sample': 'samples',
    'sequencingExperiment': 'sequencingExperiments',
    'project': 'projects',
    'sequencingRun': 'sequencingRuns'
}

USI_ENTITY_CURR_VERSION_LINK = {
    'study': 'studies',
    'sample': 'samples',
    'sequencingExperiment': 'assays',
    'project': 'projects',
    'sequencingRun': 'assayData'
}


class USIAPI:
    def __init__(self, url=None):
        self.logger = logging.getLogger(__name__)
        self.url = url if url else config.USI_API_URL
        self.logger.info(f'Using {self.url}')

        self.aap_api_domain = config.AAP_API_DOMAIN
        self.token_client = AAPTokenClient(url=config.AAP_API_URL)
        self.token_manager = TokenManager(token_client=self.token_client)
        retry_policy = retry.Retry(
            total=100,  # seems that this has a default value of 10,
            # setting this to a very high number so that it'll respect the status retry count
            status=17,  # status is the no. of retries if response is in status_forcelist,
            # this count will retry for ~20mins with back off timeout within
            read=10,
            status_forcelist=[500, 502, 503, 504],
            backoff_factor=0.6)
        self.session = requests.Session()
        adapter = requests.adapters.HTTPAdapter(max_retries=retry_policy)
        self.session.mount('https://', adapter)

    def get_headers(self):
        return {
            'Content-type': 'application/json',
            'Accept': 'application/json',
            'Authorization': 'Bearer ' + self.token_manager.get_token()
        }

    def create_submission(self):
        create_submissions_url = self.url + '/api/teams/' + self.aap_api_domain + '/submissions'
        return self._post(url=create_submissions_url, data_json={})

    def get_submission(self, url):
        return self._get(url=url)

    def delete_submission(self, delete_url):
        return self._delete(delete_url)

    def get_contents(self, get_contents_url):
        return self._get(get_contents_url)

    def get_entity_url(self, entity_type):
        return USI_ENTITY_LINK[entity_type]

    def get_submission_status(self, get_submission_status_url):
        return self._get(get_submission_status_url)

    def create_entity(self, create_entity_url, content):
        return self._post(create_entity_url, content)

    def create_samples(self, create_sample_url, samples):
        for sample in samples:
            converted_sample = self.converter.convert(sample)
            self.create_entity(create_sample_url, converted_sample)

    def get_available_statuses(self, get_available_statuses_url):
        return self._get_all(get_available_statuses_url, 'statusDescriptions')

    def get_validation_results(self, get_validation_results_url):
        return self._get_all(get_validation_results_url, 'validationResults')

    def get_validation_result_details(self, get_validation_result_url):
        return self._get(get_validation_result_url)

    def update_submission_status(self, usi_submission, new_status):
        submission_status_url = usi_submission['_links']['submissionStatus']['href']
        status_json = {"status": new_status}

        updated_submission = self._patch(submission_status_url, status_json)

        return updated_submission

    def get_processing_summary(self, usi_submission):
        get_summary_url = usi_submission['_links']['processingStatusSummary']['href']

        return self._get(get_summary_url)

    def get_processing_results(self, usi_submission):
        get_results_url = usi_submission['_links']['processingStatuses']['href']
        return self._get_all(get_results_url, 'processingStatuses')

    def get_current_version(self, entity_type, alias):
        entity_link = USI_ENTITY_CURR_VERSION_LINK[entity_type]
        url = f'{self.url}/api/{entity_link}/search/current-version?teamName={self.aap_api_domain}&alias={alias}'
        response = self.session.get(url, headers=self.get_headers())

        if response.status_code == requests.codes.not_found:
            return None
        elif response.status_code == requests.codes.ok:
            return response.json()
        else:
            response.raise_for_status()

    # ===

    def _get(self, url):
        response = self.session.get(url, headers=self.get_headers())
        return self._get_json(response)

    def _post(self, url, data_json):
        response = self.session.post(url, data=json.dumps(data_json), headers=self.get_headers())
        return self._get_json(response)

    def _patch(self, url, data_json):
        response = self.session.patch(url, data=json.dumps(data_json), headers=self.get_headers())
        return self._get_json(response)

    def _delete(self, delete_url):
        response = self.session.delete(delete_url, headers=self.get_headers())

        if response.ok:
            return True
        else:
            self.logger.error('Response:' + response.text)

        return False

    def _get_json(self, response):
        response.raise_for_status()
        return response.json()

    def _get_embedded_list(self, response, list_name):
        if response and "_embedded" in response:
            return response["_embedded"][list_name]
        return []

    def _get_all(self, url, entity_type):
        r = self.session.get(url, headers=self.get_headers())
        r.raise_for_status()
        if "_embedded" in r.json():
            for entity in r.json()["_embedded"][entity_type]:
                yield entity
            while "next" in r.json()["_links"]:
                r = self.session.get(r.json()["_links"]["next"]["href"], headers=self.get_headers())
                for entity in r.json()["_embedded"][entity_type]:
                    yield entity