ngi_pipeline/database/classes.py
from __future__ import print_function
import functools
import json
import os
import re
import requests
from ngi_pipeline.database.utils import load_charon_variables
from ngi_pipeline.log.loggers import minimal_logger
from requests.exceptions import Timeout
import six
LOG = minimal_logger(__name__)
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class CharonSession(six.with_metaclass(Singleton, requests.Session)):
def __init__(self, config=None, config_file_path=None):
super(CharonSession, self).__init__()
_charon_vars_dict = load_charon_variables(config=config,
config_file_path=config_file_path)
try:
self._api_token = _charon_vars_dict['charon_api_token']
self._api_token_dict = {'X-Charon-API-token': self._api_token}
# Remove trailing slashes
m = re.match(r'(?P<url>.*\w+)/*', _charon_vars_dict['charon_base_url'])
if m:
_charon_vars_dict['charon_base_url'] = m.groups()[0]
self._base_url = _charon_vars_dict['charon_base_url']
except KeyError as e:
raise ValueError('Unable to load needed Charon variable: {}'.format(e))
self.get = validate_response(functools.partial(self.get,
headers=self._api_token_dict, timeout=10))
self.post = validate_response(functools.partial(self.post,
headers=self._api_token_dict, timeout=10))
self.put = validate_response(functools.partial(self.put,
headers=self._api_token_dict, timeout=10))
self.delete = validate_response(functools.partial(self.delete,
headers=self._api_token_dict, timeout=10))
self._project_params = ('projectid', 'name', 'status', 'best_practice_analysis',
'sequencing_facility', 'delivery_status', 'delivery_token', 'delivery_projects')
self._project_reset_params = tuple(set(self._project_params) - \
set(['projectid', 'name',
'best_practice_analysis',
'sequencing_facility']))
self._sample_params = ('sampleid', 'status', 'analysis_status', 'qc_status',
'genotype_status', 'genotype_concordance',
'total_autosomal_coverage', 'total_sequenced_reads',
'delivery_status', 'duplication_pc', 'type', 'pair','delivery_token', 'delivery_projects')
self._sample_reset_params = tuple(set(self._sample_params) - \
set(['sampleid', 'total_sequenced_reads']))
self._libprep_params = ('libprepid', 'qc')
self._libprep_reset_params = tuple()
self._seqrun_params = ('seqrunid', 'lane_sequencing_status',
'alignment_status', 'genotype_status',
'total_reads', 'mean_autosomal_coverage')
self._seqrun_reset_params = tuple(set(self._seqrun_params) - \
set(['seqrunid', 'lane_sequencing_status',
'total_reads']))
def construct_charon_url(self, *args):
"""Build a Charon URL, appending any *args passed."""
return "{}/api/v1/{}".format(self._base_url,'/'.join([str(a) for a in args]))
def reset_base_url(self, charon_url):
LOG.info('Resetting Charon base URL from "{}" to "{}"'.format(self._base_url,
charon_url))
self._base_url = charon_url
# Project
def project_create(self, projectid, name=None, status=None,
best_practice_analysis=None, sequencing_facility=None):
l_dict = locals()
data = { k: l_dict.get(k) for k in self._project_params }
return self.post(self.construct_charon_url('project'),
data=json.dumps(data)).json()
def project_get(self, projectid):
return self.get(self.construct_charon_url('project', projectid)).json()
def project_get_samples(self, projectid):
return self.get(self.construct_charon_url('samples', projectid)).json()
def project_update(self, projectid, name=None, status=None, best_practice_analysis=None,
sequencing_facility=None, delivery_status=None, delivery_token=None, delivery_projects=None):
l_dict = locals()
data = { k: l_dict.get(k) for k in self._project_params if l_dict.get(k)}
return self.put(self.construct_charon_url('project', projectid),
data=json.dumps(data)).text
def projects_get_all(self):
return self.get(self.construct_charon_url('projects')).json()
def project_reset(self, projectid):
url = self.construct_charon_url("project", projectid)
data = { k: None for k in self._project_reset_params}
return self.put(url, json.dumps(data)).text
def project_delete(self, projectid):
return self.delete(self.construct_charon_url('project', projectid)).text
# Sample
def sample_create(self, projectid, sampleid, status=None, analysis_status=None,
qc_status=None, genotype_status=None,
genotype_concordance=None, total_autosomal_coverage=None,
total_sequenced_reads=None, delivery_status=None):
url = self.construct_charon_url("sample", projectid)
l_dict = locals()
data = { k: l_dict.get(k) for k in self._sample_params }
return self.post(url, json.dumps(data)).json()
def sample_get(self, projectid, sampleid):
url = self.construct_charon_url("sample", projectid, sampleid)
return self.get(url).json()
def sample_get_libpreps(self, projectid, sampleid):
return self.get(self.construct_charon_url('libpreps', projectid, sampleid)).json()
def sample_get_projects(self, sampleid):
return self.get(self.construct_charon_url('projectidsfromsampleid', sampleid)).json()
def sample_update(self, projectid, sampleid, status=None, analysis_status=None,
qc_status=None, genotype_status=None,
genotype_concordance=None, total_autosomal_coverage=None,
total_sequenced_reads=None, delivery_status=None, duplication_pc=None, delivery_token=None, delivery_projects=None ):
url = self.construct_charon_url("sample", projectid, sampleid)
l_dict = locals()
data = { k: l_dict.get(k) for k in self._sample_params if l_dict.get(k)}
return self.put(url, json.dumps(data)).text
def sample_reset(self, projectid, sampleid):
url = self.construct_charon_url("sample", projectid, sampleid)
data = { k: None for k in self._sample_reset_params}
return self.put(url, json.dumps(data)).text
def sample_delete(self, projectid, sampleid):
return self.delete(self.construct_charon_url("sample", projectid, sampleid))
# LibPrep
def libprep_create(self, projectid, sampleid, libprepid, qc=None):
url = self.construct_charon_url("libprep", projectid, sampleid)
l_dict = locals()
data = { k: l_dict.get(k) for k in self._libprep_params }
return self.post(url, json.dumps(data)).json()
def libprep_get(self, projectid, sampleid, libprepid):
url = self.construct_charon_url("libprep", projectid, sampleid, libprepid)
return self.get(url).json()
def libprep_get_seqruns(self, projectid, sampleid, libprepid):
return self.get(self.construct_charon_url('seqruns', projectid, sampleid, libprepid)).json()
def libprep_update(self, projectid, sampleid, libprepid, qc=None):
url = self.construct_charon_url("libprep", projectid, sampleid, libprepid)
l_dict = locals()
data = { k: l_dict.get(k) for k in self._libprep_params if l_dict.get(k)}
return self.put(url, json.dumps(data)).text
def libprep_reset(self, projectid, sampleid, libprepid):
url = self.construct_charon_url("libprep", projectid, sampleid, libprepid)
data = { k: None for k in self._libprep_reset_params}
return self.put(url, json.dumps(data)).text
def libprep_delete(self, projectid, sampleid, libprepid):
return self.delete(self.construct_charon_url("libprep", projectid, sampleid, libprepid))
# SeqRun
def seqrun_create(self, projectid, sampleid, libprepid, seqrunid,
lane_sequencing_status=None, alignment_status=None,
genotype_status=None, runid=None, total_reads=None,
mean_autosomal_coverage=None):
url = self.construct_charon_url("seqrun", projectid, sampleid, libprepid)
l_dict = locals()
data = { k: l_dict.get(k) for k in self._seqrun_params }
return self.post(url, json.dumps(data)).json()
def seqrun_get(self, projectid, sampleid, libprepid, seqrunid):
url = self.construct_charon_url("seqrun", projectid, sampleid, libprepid, seqrunid)
return self.get(url).json()
def seqrun_update(self, projectid, sampleid, libprepid, seqrunid,
lane_sequencing_status=None, alignment_status=None,
genotype_status=None, runid=None, total_reads=None,
mean_autosomal_coverage=None, *args, **kwargs):
if args: LOG.debug("Ignoring extra args: {}".format(", ".join(*args)))
if kwargs: LOG.debug("Ignoring extra kwargs: {}".format(", ".join(["{}: {}".format(k,v) for k,v in kwargs.items()])))
url = self.construct_charon_url("seqrun", projectid, sampleid, libprepid, seqrunid)
l_dict = locals()
data = { k: str(l_dict.get(k)) for k in self._seqrun_params if l_dict.get(k)}
return self.put(url, json.dumps(data)).text
def seqrun_reset(self, projectid, sampleid, libprepid, seqrunid):
url = self.construct_charon_url("seqrun", projectid, sampleid, libprepid, seqrunid)
data = { k: None for k in self._seqrun_reset_params}
return self.put(url, json.dumps(data)).text
def seqrun_delete(self, projectid, sampleid, libprepid, seqrunid):
return self.delete(self.construct_charon_url("seqrun", projectid, sampleid, libprepid, seqrunid))
class CharonError(Exception):
def __init__(self, message, status_code=None, *args, **kwargs):
self.status_code = status_code
super(CharonError, self).__init__(message, *args, **kwargs)
class validate_response(object):
"""
Validate or raise an appropriate exception for a Charon API query.
"""
def __init__(self, f):
self.f = f
## Should these be class attributes? I don't really know
self.SUCCESS_CODES = (200, 201, 204)
# There are certainly more failure codes I need to add here
self.FAILURE_CODES = {
400: (CharonError, ("Charon access failure: invalid input "
"data (reason '{response.reason}' / "
"code {response.status_code} / "
"url '{response.url}')")),
404: (CharonError, ("Charon access failure: not found "
"in database (reason '{response.reason}' / "
"code {response.status_code} / "
"url '{response.url}')")), # when else can we get this? malformed URL?
405: (CharonError, ("Charon access failure: method not "
"allowed (reason '{response.reason}' / "
"code {response.status_code} / "
"url '{response.url}')")),
408: (CharonError, ("Charon access failure: connection timed out")),
409: (CharonError, ("Charon access failure: document "
"revision conflict (reason '{response.reason}' / "
"code {response.status_code} / "
"url '{response.url}')")),}
def __call__(self, *args, **kwargs):
try:
response = self.f(*args, **kwargs)
except Timeout as e:
c_e = CharonError(e)
c_e.status_code = 408
raise c_e
if response.status_code not in self.SUCCESS_CODES:
try:
err_type, err_msg = self.FAILURE_CODES[response.status_code]
except KeyError:
# Error code undefined, used generic text
err_type = CharonError
err_msg = ("Charon access failure: {response.reason} "
"(code {response.status_code} / url '{response.url}')")
raise err_type(err_msg.format(**locals()), response.status_code)
return response