archiver/converter.py
import logging
import re
from flatten_json import flatten
from archiver.ontology_api import OntologyAPI
"""
HCA to USI JSON Mapping
https://docs.google.com/document/d/1yXTelUt-CvlI7-Jkh7K_NCPIBfhRXMvjT4wRkyxpIN0/edit#
"""
class Converter:
def __init__(self, ontology_api=None):
self.logger = logging.getLogger(__name__)
self.field_mapping = {
"uuid__uuid": "alias",
"submissionDate": "releaseDate"
}
self.alias_prefix = ''
self.exclude_data = []
self.exclude_fields_match = ['__schema_type', '__describedBy', '__ontology_label']
self.ingest_api = None
self.ontology_api = ontology_api
self.remove_input_prefix = False
self.to_lowercase_attributes = False
def convert(self, hca_data):
try:
flattened_hca_data = self._flatten(hca_data)
extracted_data = self._extract_fields(flattened_hca_data, hca_data)
converted_data = self._build_output(extracted_data, flattened_hca_data, hca_data=hca_data)
converted_data = self.rename_attributes(converted_data, hca_data)
extracted_data["alias"] = f'{self.alias_prefix}{extracted_data["alias"]}'
except KeyError as e:
error_message = "Error:" + str(e)
self.logger.error(error_message)
raise ConversionError("Conversion Error",
"An error occurred in converting the metadata. Data maybe malformed.",
details={'data': hca_data})
return converted_data
def _flatten(self, hca_data):
input_data = dict(hca_data)
for key in self.exclude_data:
if key in input_data:
del input_data[key]
flattened = flatten(input_data, '__')
delete_keys = {}
if self.exclude_fields_match:
for key in flattened.keys():
for keyword in self.exclude_fields_match:
if keyword in key:
delete_keys[key] = True
for key in delete_keys.keys():
del flattened[key]
return flattened
def _extract_fields(self, flattened_hca_data, hca_data):
extracted_data = {}
for key, new_key in self.field_mapping.items():
if key in flattened_hca_data:
extracted_data[new_key] = flattened_hca_data[key]
else:
extracted_data[new_key] = ""
extracted_data["attributes"] = self._extract_attributes(flattened_hca_data)
for input_key, entity in hca_data.items():
if isinstance(entity, dict):
extracted_data["attributes"][f"HCA {input_key.replace('_', ' ').title()} UUID"] = [dict(value=entity["uuid"]["uuid"])]
elif isinstance(entity, list):
uuid_list = [e["uuid"]["uuid"] for e in entity]
extracted_data["attributes"][
f"HCA {input_key.replace('_', ' ').title()} UUID's"] = [
dict(value=', '.join(uuid_list))]
return extracted_data
def _extract_attributes(self, flattened_hca_data):
attributes = {}
prefix = "content__"
ontology_keyword = "__ontology"
ontology_text_keyword = "__text"
for key, value in flattened_hca_data.items():
if re.search(f'__{prefix}', key) and key not in self.field_mapping:
if ontology_keyword in key:
text_field = key.replace(ontology_keyword, ontology_text_keyword)
text = flattened_hca_data.get(text_field, '')
attr = {
"value": text,
"terms": [{
"url": self.ontology_api.expand_curie(value)
}]
}
text_field = text_field.replace(prefix, '')
name = text_field.replace(ontology_text_keyword, '')
attributes[name] = [attr]
elif ontology_text_keyword in key:
# ignore
pass
else:
field = key.replace(prefix, '')
attr = {
"name" : field,
"value": value,
"terms": []
}
attributes[attr['name']] = [dict(value=value)]
return attributes
def _build_output(self, extracted_data, flattened_hca_data=None, hca_data=None):
return extracted_data
def rename_attributes(self, converted_data, hca_data):
new_attributes = {}
attributes = converted_data.get('attributes')
for field, value in attributes.items():
if '__' in field:
new_field = field
if self.remove_input_prefix:
split_fields = field.split('__', 1)
new_field = split_fields[-1] if split_fields else field
new_field = new_field.replace('__', ' - ')
if self.to_lowercase_attributes:
new_field = new_field.replace('_', ' ').lower()
else:
new_field = new_field.replace('_', ' ').title()
new_attributes[new_field] = value
else:
new_attributes[field] = value
converted_data['attributes'] = new_attributes
return converted_data
class SampleConverter(Converter):
def __init__(self, ontology_api):
super(SampleConverter, self).__init__(ontology_api)
self.logger = logging.getLogger(__name__)
self.field_mapping = {
"biomaterial__uuid__uuid": "alias",
"biomaterial__content__biomaterial_core__biomaterial_name": "title",
"biomaterial__content__biomaterial_core__biomaterial_description": "description",
"biomaterial__content__biomaterial_core__ncbi_taxon_id__0": "taxonId",
"biomaterial__submissionDate": "releaseDate"
}
# TODO local mapping for now, ideally this should be an OLS lookup
# TODO what's taxon id for mouse
self.taxon_map = {
"9606": "Homo sapiens",
"10090": "Mus musculus"
}
self.exclude_data = ['genus_species']
self.exclude_fields_match = ['__schema_type', '__describedBy',
'__ontology_label',
# FIXME only donors contain this info but this is redundant with taxonId, removing this if it exists
'biomaterial__content__genus_species__0__ontology',
'biomaterial__content__genus_species__0__text']
self.remove_input_prefix = True
def _build_output(self, extracted_data, flattened_hca_data, hca_data):
extracted_data["releaseDate"] = extracted_data["releaseDate"].split('T')[0]
extracted_data["sampleRelationships"] = []
taxon_id = str(extracted_data.get("taxonId", ''))
extracted_data["taxon"] = self.taxon_map.get(taxon_id)
if not extracted_data["taxon"]:
raise ConversionError("Sample Conversion Error",
f"Sample Converter find the taxon text from taxon id, {taxon_id}",
details={'taxon_id': taxon_id})
# non required fields
if "title" in extracted_data:
extracted_data["title"] = extracted_data["title"]
if not extracted_data.get("attributes"):
extracted_data["attributes"] = {}
extracted_data["taxon"] = self.taxon_map.get(str(extracted_data["taxonId"]))
if not extracted_data["taxon"]:
raise ConversionError("Sample Conversion Error", "Sample Converter find the taxon text from taxonId.")
concrete_type = self._get_concrete_type(hca_data.get('biomaterial'))
extracted_data["attributes"]["HCA Biomaterial Type"] = [dict(value=concrete_type)]
extracted_data["attributes"]["project"] = [dict(value="Human Cell Atlas")]
return extracted_data
def _get_concrete_type(self, entity):
concrete_type = self.ingest_api.get_concrete_entity_type(entity)
return concrete_type
class SequencingExperimentConverter(Converter):
def __init__(self, ontology_api):
super(SequencingExperimentConverter, self).__init__(ontology_api)
self.logger = logging.getLogger(__name__)
self.alias_prefix = 'sequencingExperiment_'
self.library_selection_mapping = {
"poly-dT": "Oligo-dT",
"random": "RANDOM",
}
self.instrument_model_map = {
"illumina genome analyzer": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina Genome Analyzer"
},
"illumina genome analyzer ii": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina Genome Analyzer II"
},
"illumina genome analyzer iix": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina Genome Analyzer IIx"
},
"illumina hiseq 2500": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina HiSeq 2500"
},
"illumina hiseq 2000": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina HiSeq 2000"
},
"illumina hiseq 1500": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina HiSeq 1500"
},
"illumina hiseq 1000": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina HiSeq 1000"
},
"illumina miseq": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina MiSeq"
},
"illumina hiscansq": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina HiScanSQ"
},
"hiseq x ten": {
"platform_type": "ILLUMINA",
"intrument_model": "HiSeq X Ten",
"synonymns": [
"illumina hiseq x 10"
]
},
"nextseq 500": {
"platform_type": "ILLUMINA",
"intrument_model": "NextSeq 500",
"synonymns": [
"illumina nextseq 500"
]
},
"hiseq x five": {
"platform_type": "ILLUMINA",
"intrument_model": "HiSeq X Five",
},
"illumina hiseq 3000": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina HiSeq 3000"
},
"illumina hiseq 4000": {
"platform_type": "ILLUMINA",
"intrument_model": "Illumina HiSeq 4000"
},
"nextseq 550": {
"platform_type": "ILLUMINA",
"intrument_model": "NextSeq 550",
}
}
self.field_mapping = {
"process__uuid__uuid": "alias",
"sequencing_protocol__content__protocol_core__protocol_name": "title",
"sequencing_protocol__content__protocol_core__protocol_description": "description"
}
def _build_output(self, extracted_data, flattened_hca_data, hca_data=None):
extracted_data["studyRef"] = {}
extracted_data["sampleUses"] = []
if not extracted_data.get("attributes"):
extracted_data["attributes"] = {}
extracted_data["attributes"]["library_strategy"] = [dict(value="OTHER")]
extracted_data["attributes"]["library_source"] = [dict(value="TRANSCRIPTOMIC SINGLE CELL")]
primer = flattened_hca_data.get("library_preparation_protocol__content__primer")
if primer:
extracted_data["attributes"]["library_selection"] = [dict(value=self.library_selection_mapping.get(primer, "unspecified"))]
paired_end = flattened_hca_data.get("sequencing_protocol__content__paired_end")
if paired_end:
extracted_data["attributes"]["library_layout"] = [dict(value="PAIRED")]
# TODO put 0 as default as we don't really capture this in HCA but there's no way to specify 'unspecified'
extracted_data["attributes"]["nominal_length"] = [dict(value="0")]
extracted_data["attributes"]["nominal_sdev"] = [dict(value="0")]
else:
extracted_data["attributes"]["library_layout"] = [dict(value="SINGLE")]
# must correctly match ENA enum values
instr_model_text = flattened_hca_data.get("sequencing_protocol__content__instrument_manufacturer_model__text")
instrument_model_obj = self.instrument_model_map.get(instr_model_text.lower(), {})
instrument_model = instrument_model_obj.get('intrument_model', 'unspecified')
platform_type = instrument_model_obj.get('platform_type', 'unspecified')
for key, obj in self.instrument_model_map.items():
synonyms = obj.get("synonymns")
if synonyms and instr_model_text.lower() in synonyms:
instrument_model = obj.get('intrument_model', 'unspecified')
platform_type = obj.get('platform_type', 'unspecified')
extracted_data["attributes"]["instrument_model"] = [dict(value=instrument_model)]
extracted_data["attributes"]["platform_type"] = [dict(value=platform_type)]
extracted_data["attributes"]["design_description"] = [dict(value="unspecified")]
library_name = flattened_hca_data.get("input_biomaterial__content__biomaterial_core__biomaterial_id", "")
if not library_name:
raise ConversionError("Sequencing Experiment Conversion Error",
"There is no id found for the input biomaterial.")
extracted_data["attributes"]["library_name"] = [dict(value=library_name)]
self._build_links(extracted_data, {})
return extracted_data
# TODO implement
def _build_links(self, extracted_data, links):
extracted_data["studyRef"] = {"alias": "{studyAlias.placeholder}"}
extracted_data["sampleUses"] = [{"sampleRef": {"alias": "{sampleAlias.placeholder}"}}]
class SequencingRunConverter(Converter):
def __init__(self, ontology_api):
super(SequencingRunConverter, self).__init__(ontology_api)
self.logger = logging.getLogger(__name__)
self.field_mapping = {
"process__uuid__uuid": "alias",
"process__content__process_core__process_name": "title",
"process__content__process_core__process_description": "description"
}
self.ONTOLOGY_10x = "EFO:0009310"
self.file_format = {
'fastq.gz': 'fastq',
'bam': 'bam',
'cram': 'cram',
}
self.alias_prefix = 'sequencingRun_'
self.exclude_data = ['bundle_uuid', 'library_preparation_protocol']
def convert(self, hca_data):
converted_data = super(SequencingRunConverter, self).convert(hca_data)
files = []
lib_prep = hca_data.get("library_preparation_protocol", {})
content = lib_prep.get("content", {})
library_const_approach_obj = content.get("library_construction_approach", {})
library_const_approach = library_const_approach_obj.get('ontology')
if library_const_approach and library_const_approach == self.ONTOLOGY_10x:
files = [{
'name': f"{hca_data['bundle_uuid']}.bam",
'type': 'bam'
}]
else:
for file in hca_data['files']:
flattened_file = self._flatten(file)
files.append({
'name': flattened_file.get('content__file_core__file_name'),
'type': self.file_format[flattened_file.get('content__file_core__file_format')]
})
converted_data['files'] = files
return converted_data
def _build_output(self, extracted_data, flattened_hca_data, hca_data=None):
self._build_links(extracted_data, {})
return extracted_data
# TODO implement
def _build_links(self, extracted_data, links):
extracted_data["assayRefs"] = {"alias": "{assayAlias.placeholder}"}
class ProjectConverter(Converter):
def __init__(self, ontology_api):
super(ProjectConverter, self).__init__(ontology_api)
self.logger = logging.getLogger(__name__)
self.field_mapping = {
"project__uuid__uuid": "alias",
"project__content__project_core__project_title": "title",
"project__content__project_core__project_description": "description",
"project__submissionDate": "releaseDate"
}
self.alias_prefix = 'project_'
self.exclude_fields_match = ['__schema_type', '__describedBy',
'__contributors', '__publications',
'__funders']
self.remove_input_prefix = True
def _build_output(self, extracted_data, flattened_hca_data, hca_data=None):
# TODO BioStudies minimum length
title_len = len(extracted_data["title"])
MIN_LEN = 25
DELIM = ' , '
if title_len < MIN_LEN:
prefix = "HCA project: "
extracted_data["title"] = prefix + extracted_data["title"]
extracted_data["releaseDate"] = extracted_data["releaseDate"].split('T')[0]
contacts = []
contributors = hca_data['project']['content'].get('contributors', [])
for contributor in contributors:
project_role = contributor.get("project_role", "")
if "wrangler" in project_role or "curator" in project_role:
continue
contact_name = contributor.get("contact_name", "")
names = contact_name.split(',', 2)
if len(names) == 3:
first = names[0]
middle = names[1][0] if names[1] else ''
last = names[2]
else:
raise ConversionError("HCA Contributor contact name, {contact_name}, couldn't be parsed.")
contact = {
"orcid": contributor.get("orcid_id", ""),
"firstName": first,
"middleInitials": middle,
"lastName": last,
"email": contributor.get("email", ""),
"address": contributor.get("address", ""),
"affiliation": contributor.get("institution", ""),
"phone": contributor.get("phone", ""),
}
contacts.append(contact)
extracted_data["contacts"] = contacts
hca_publications = hca_data['project']['content'].get('publications', [])
publications = []
for hca_publication in hca_publications:
publication = {
"pubmedId": hca_publication.get("pmid", ""),
"doi": hca_publication.get("doi", ""),
"articleTitle": hca_publication.get("publication_title", ""),
"authors": f"{DELIM}".join(hca_publication.get("authors", []))
}
publications.append(publication)
extracted_data["publications"] = publications
hca_funders = hca_data['project']['content'].get('funders', [])
funders = []
for hca_funder in hca_funders:
funder = {
"grantTitle": hca_funder.get("grant_title", ""),
"grantId": hca_funder.get("grant_id", ""),
"organization": hca_funder.get("organization", "")
}
funders.append(funder)
extracted_data["funders"] = funders
return extracted_data
class StudyConverter(Converter):
def __init__(self, ontology_api):
super(StudyConverter, self).__init__(ontology_api)
self.logger = logging.getLogger(__name__)
self.field_mapping = {
"project__uuid__uuid": "alias",
"project__content__project_core__project_title": "title",
"project__content__project_core__project_description": "description"
}
self.alias_prefix = 'study_'
self.exclude_data = ['contributors', 'publications']
self.exclude_fields_match = ['__schema_type', '__describedBy', '__contributors', '__publications', '__funders']
self.remove_input_prefix = True
def _build_output(self, extracted_data, flattened_hca_data, hca_data=None):
if not extracted_data.get("attributes"):
extracted_data["attributes"] = {}
extracted_data["attributes"]["study_type"] = [dict(value="Transcriptome Analysis")]
description = extracted_data['description']
extracted_data["attributes"]["study_abstract"] = [dict(value=description)]
self._build_links(extracted_data, {})
return extracted_data
def _build_links(self, extracted_data, links):
extracted_data["projectRef"] = {"alias": "{projectAlias.placeholder}"}
class ConversionError(Exception):
def __init__(self, expression, message, details=None):
self.expression = expression
self.message = message
self.details = details