HumanCellAtlas/ingest-archiver

View on GitHub
archiver/archiver.py

Summary

Maintainability
D
2 days
Test Coverage
import json
import logging
import sys

import polling as polling

import config

from archiver.converter import ConversionError, SampleConverter, ProjectConverter, \
    SequencingExperimentConverter, SequencingRunConverter, StudyConverter

from archiver import util


def _print_same_line(string):
    print(f'\r{string}', end='')


class ArchiverError(Exception):
    """Base-class for all exceptions raised by this module."""


class ArchiveEntity:
    def __init__(self):
        self.data = {}
        self.conversion = {}
        self.errors = []
        self.warnings = []
        self.id = None
        self.archive_entity_type = None
        self.accession = None
        self.usi_json = None
        self.usi_current_version = None
        self.links = {}
        self.bundle_uuid = None

    def __str__(self):
        return str(vars(self))


class ArchiveEntityMap:
    def __init__(self):
        self.entities_dict_type = {}

    def add_entities(self, entities):
        for entity in entities:
            self.add_entity(entity)

    def add_entity(self, entity: ArchiveEntity):
        if not self.entities_dict_type.get(entity.archive_entity_type):
            self.entities_dict_type[entity.archive_entity_type] = {}
        self.entities_dict_type[entity.archive_entity_type][entity.id] = entity

    def get_entity(self, entity_type, archive_entity_id):
        if self.entities_dict_type.get(entity_type):
            return self.entities_dict_type[entity_type].get(archive_entity_id)
        return None

    def get_converted_entities(self):
        for entities_dict in self.entities_dict_type.values():
            for entity in entities_dict.values():
                if entity.conversion and not entity.errors:
                    yield entity

    def get_conversion_summary(self):
        summary = {}
        for entities_dict in self.entities_dict_type.values():
            for entity in entities_dict.values():
                if entity.conversion and not entity.errors:
                    if not summary.get(entity.archive_entity_type):
                        summary[entity.archive_entity_type] = 0
                    summary[entity.archive_entity_type] = summary[entity.archive_entity_type] + 1
        return summary

    def find_entity(self, alias):
        for entities_dict in self.entities_dict_type.values():
            if entities_dict.get(alias):
                return entities_dict.get(alias)
        return None

    def get_entities(self):
        entities = []
        for entity_type, entities_dict in self.entities_dict_type.items():
            if not entities_dict:
                continue
            for alias, entity in entities_dict.items():
                entities.append(entity)
        return entities

    def update(self, entity_type, entities: dict):
        if not self.entities_dict_type.get(entity_type):
            self.entities_dict_type[entity_type] = {}
        self.entities_dict_type[entity_type].update(entities)


class AssayBundle:
    def __init__(self, ingest_api, bundle_uuid):
        self.ingest_api = ingest_api

        self.bundle_uuid = bundle_uuid
        self.bundle_manifest = None

        self.project = None
        self.biomaterials = None
        self.files = None
        self.assay_process = None
        self.library_preparation_protocol = None
        self.sequencing_protocol = None
        self.input_biomaterial = None

    def get_bundle_manifest(self):
        if not self.bundle_manifest:
            bundle_uuid = self.bundle_uuid
            self.bundle_manifest = self.ingest_api.get_bundle_manifest(bundle_uuid)

        return self.bundle_manifest

    def get_project(self):
        if not self.project:
            bundle_manifest = self.get_bundle_manifest()
            project_uuid = list(bundle_manifest['fileProjectMap'].keys())[0]  # TODO one project per bundle
            self.project = self.ingest_api.get_project_by_uuid(project_uuid)

        return self.project

    def get_biomaterials(self):
        if not self.biomaterials:
            self.biomaterials = self._init_biomaterials()

        return self.biomaterials

    def get_assay_process(self):
        bundle_manifest = self.get_bundle_manifest()
        file_uuid = list(bundle_manifest['fileFilesMap'].keys())[0]

        file = self.ingest_api.get_file_by_uuid(file_uuid)
        derived_by_processes = self.ingest_api.get_related_entity(file, 'derivedByProcesses', 'processes')

        if derived_by_processes:
            self.assay_process = derived_by_processes[0]

            if len(derived_by_processes) > 1:
                raise ArchiverError(f'Bundle {self.bundle_uuid} has many assay processes.')

        return self.assay_process

    def get_library_preparation_protocol(self):
        if not self.library_preparation_protocol:
            self._init_protocols()
        return self.library_preparation_protocol

    def get_sequencing_protocol(self):
        if not self.sequencing_protocol:
            self._init_protocols()
        return self.sequencing_protocol

    def get_files(self):
        if not self.files:
            assay = self.get_assay_process()
            self.files = self.ingest_api.get_related_entity(assay, 'derivedFiles', 'files')

        return self.files

    def get_input_biomaterial(self):
        if not self.input_biomaterial:
            self.input_biomaterial = self._retrieve_input_biomaterial()

        return self.input_biomaterial

    def _init_protocols(self):
        assay = self.get_assay_process()
        protocols = self.ingest_api.get_related_entity(assay, 'protocols', 'protocols')
        protocol_by_type = {}
        for protocol in protocols:
            concrete_entity_type = self.ingest_api.get_concrete_entity_type(protocol)
            if not protocol_by_type.get(concrete_entity_type):
                protocol_by_type[concrete_entity_type] = []

            protocol_by_type[concrete_entity_type].append(protocol)

        library_preparation_protocols = protocol_by_type.get('library_preparation_protocol', [])
        sequencing_protocols = protocol_by_type.get('sequencing_protocol', [])

        if len(library_preparation_protocols) != 1:
            raise ArchiverError('There should be 1 library preparation protocol for the assay process.')

        if len(sequencing_protocols) != 1:
            raise ArchiverError('There should be 1 sequencing_protocol for the assay process.')

        self.library_preparation_protocol = library_preparation_protocols[0]
        self.sequencing_protocol = sequencing_protocols[0]

    def _init_biomaterials(self):
        bundle_manifest = self.get_bundle_manifest()
        for biomaterial_uuid in bundle_manifest['fileBiomaterialMap'].keys():
            yield self.ingest_api.get_biomaterial_by_uuid(biomaterial_uuid)

    def _retrieve_input_biomaterial(self):
        assay = self.get_assay_process()
        input_biomaterials = self.ingest_api.get_related_entity(assay, 'inputBiomaterials', 'biomaterials')

        if not input_biomaterials:
            raise ArchiverError('No input biomaterial found to the assay process.')

        # TODO get first for now, clarify if it's possible to have multiple and how to specify the links
        return input_biomaterials[0]


class ArchiveSubmission:
    def __init__(self, usi_api, usi_submission_url=None):
        self.usi_submission = {}
        self.errors = list()
        self.processing_result = list()
        self.validation_result = list()
        self.is_completed = False
        self.converted_entities = list()
        self.entity_map = ArchiveEntityMap()
        self.usi_api = usi_api
        self.file_upload_info = list()
        self.accession_map = None
        self.invalid = False
        self.status = None

        if usi_submission_url:
            self.usi_submission = self.usi_api.get_submission(usi_submission_url)
            self.status = self.get_status()

    def get_status(self):
        get_status_url = self.usi_submission['_links']['submissionStatus']['href']
        submission_status = self.usi_api.get_submission_status(get_status_url)
        state = submission_status.get('status')
        return state

    def __str__(self):
        return str(vars(self))

    def add_entities(self, converted_entities):
        get_contents_url = self.usi_submission['_links']['contents']['href']
        contents = self.usi_api.get_contents(get_contents_url)

        for entity in converted_entities:
            entity_link = self.usi_api.get_entity_url(entity.archive_entity_type)
            create_entity_url = contents['_links'][f'{entity_link}:create']['href']

            created_entity = self.usi_api.create_entity(create_entity_url, entity.conversion)
            entity.usi_json = created_entity

    def validate(self):
        if not self.usi_submission:
            return self

        is_validated = False
        try:
            is_validated = polling.poll(
                lambda: self.is_validated(),
                step=config.VALIDATION_POLLING_STEP,
                timeout=config.VALIDATION_POLLING_TIMEOUT if not config.VALIDATION_POLL_FOREVER else None,
                poll_forever=True if config.VALIDATION_POLL_FOREVER else False
            )
        except polling.TimeoutException as te:
            self.errors.append({
                "error_message": "USI validation takes too long to complete.",
            })

        if is_validated and self.get_all_validation_errors():
            validation_summary = self.get_all_validation_result_details()
            self.errors.append({
                "error_message": "Failed in USI validation.",
                "details"      : {
                    "usi_validation_errors": self.get_all_validation_errors()
                }
            })
            self.validation_result = validation_summary
            return self

        return self

    def validate_and_submit(self):
        if not self.usi_submission:
            return self

        print("Waiting for the submission to be validated in USI...")

        is_validated = False
        try:
            is_validated = polling.poll(
                lambda: self.is_ready_to_submit(),
                step=config.VALIDATION_POLLING_STEP,
                timeout=config.VALIDATION_POLLING_TIMEOUT if not config.VALIDATION_POLL_FOREVER else None,
                poll_forever=True if config.VALIDATION_POLL_FOREVER else False
            )
        except polling.TimeoutException as te:
            self.errors.append({
                "error_message": "USI validation takes too long to complete.",
            })

        if is_validated and self.get_all_validation_errors():
            validation_summary = self.get_all_validation_result_details()
            self.validation_result = validation_summary
            self.errors.append({
                "error_message": "Failed in USI validation.",
                "details": {
                    "usi_validation_errors": self.get_all_validation_errors()
                }
            })
            self.invalid = True

        if self.is_submittable():
            self.submit()

        return self

    def submit(self):
        self.usi_api.update_submission_status(self.usi_submission, 'Submitted')

        print("USI Submission is submitted! Waiting for the submission result. Please do not submit again.")

        try:
            self.is_completed = polling.poll(
                lambda: self.is_processing_complete(),
                step=config.SUBMISSION_POLLING_STEP,
                timeout=config.SUBMISSION_POLLING_TIMEOUT if not config.SUBMISSION_POLL_FOREVER else None,
                poll_forever=True if config.SUBMISSION_POLL_FOREVER else False
            )

            self.process_result()

        except polling.TimeoutException:
            self.errors.append({
                "error_message": "USI submission takes too long to complete.",
            })

    def process_result(self):
        self.processing_result = self.get_processing_results()
        accession_map = {}
        for result in self.processing_result:
            if result['status'] == 'Completed':
                alias = result['alias']
                accession = result['accession']
                accession_map[alias] = accession
            elif result['status'] == 'Error':
                self.errors.append(f"There was an error submitting a "
                                   f"{result.get('submittableType','') } with alias {result.get('alias','')} to "
                                   f"{result.get('archive','')}.")
        self.accession_map = accession_map

        return self

    def is_ready_to_submit(self):
        is_validated = self.is_validated()

        if is_validated:
            is_submittable = self.is_submittable()
            if is_submittable:
                return True
            else:
                errors = self.get_all_validation_errors()
                self.validation_result = errors
                print("####################### VALIDATION ERRORS")
                print(json.dumps(errors, indent=4))
                print("####################### SUBMISSION REPORT")
                print(json.dumps(self.generate_report(), indent=4))

        return False

    def get_all_validation_result_details(self):
        get_validation_results_url = self.usi_submission['_links']['validationResults']['href']
        validation_results = self.usi_api.get_validation_results(get_validation_results_url)

        summary = []
        for validation_result in validation_results:
            if validation_result['validationStatus'] == "Complete":
                details_url = validation_result['_links']['validationResult']['href']
                # TODO fix how what to put as projection param, check usi documentation, removing any params for now
                details_url = details_url.split('{')[0]
                validation_result_details = self.usi_api.get_validation_result_details(details_url)
                summary.append(validation_result_details)

        return summary

    def get_all_validation_errors(self):
        get_validation_results_url = self.usi_submission['_links']['validationResults']['href']
        validation_results = self.usi_api.get_validation_results(get_validation_results_url)

        errors = []
        for validation_result in validation_results:
            if validation_result['validationStatus'] == "Complete":
                details_url = validation_result['_links']['validationResult']['href']
                # TODO fix how what to put as projection param, check usi documentation, removing any params for now
                details_url = details_url.split('{')[0]
                validation_result_details = self.usi_api.get_validation_result_details(details_url)
                if validation_result_details.get('errorMessages'):
                    errors.append(validation_result_details.get('errorMessages'))

        return errors

    def is_submittable(self):
        get_status_url = self.usi_submission['_links']['submissionStatus']['href']
        submission_status = self.usi_api.get_submission_status(get_status_url)

        get_available_statuses_url = submission_status['_links']['availableStatuses']['href']
        available_statuses = self.usi_api.get_available_statuses(get_available_statuses_url)

        for status in available_statuses:
            if status['statusName'] == 'Submitted':
                return True

        return False

    def is_validated(self):
        get_validation_results_url = self.usi_submission['_links']['validationResults']['href']
        validation_results = self.usi_api.get_validation_results(get_validation_results_url)

        for validation_result in validation_results:
            if validation_result['validationStatus'] != "Complete":
                return False

        return True

    def is_validated_and_submittable(self):
        return self.is_validated(self.usi_submission) and self.is_submittable(self.usi_submission)

    def is_processing_complete(self):
        results = self.usi_api.get_processing_results(self.usi_submission)
        for result in results:
            if result['status'] != "Completed" and result['status'] != "Error":
                return False

        return True

    def delete_submission(self):
        delete_url = self.usi_submission['_links']['self:delete']['href']
        return self.usi_api.delete_submission(delete_url)

    def get_processing_results(self):
        return self.usi_api.get_processing_results(self.usi_submission)

    def get_url(self):
        # TODO remove projection placeholder
        if self.usi_submission:
            return self.usi_submission['_links']['self']['href'].split('{')[0]

        return None

    def generate_report(self):
        report = {}
        report['completed'] = self.is_completed
        report['submission_errors'] = self.errors
        report['file_upload_info'] = self.file_upload_info

        if self.usi_submission:
            report['submission_url'] = self.get_url()

        entities = {}
        for entity in self.entity_map.get_entities():
            entities[entity.id] = {}
            entities[entity.id]['errors'] = entity.errors
            entities[entity.id]['accession'] = entity.accession
            entities[entity.id]['warnings'] = entity.warnings
            entities[entity.id]['converted_data'] = entity.conversion

            if entity.usi_json:
                entities[entity.id]['entity_url'] = entity.usi_json['_links']['self']['href']

        report['entities'] = entities
        report['accessions'] = self.accession_map

        if self.entity_map:
            report['conversion_summary'] = self.entity_map.get_conversion_summary()

        return report


class IngestArchiver:
    def __init__(self, ingest_api, usi_api, ontology_api, exclude_types=None, alias_prefix=None):
        self.logger = logging.getLogger(__name__)
        self.ingest_api = ingest_api
        self.exclude_types = exclude_types if exclude_types else []
        self.alias_prefix = f"{alias_prefix}_" if alias_prefix else ""
        self.ontology_api = ontology_api
        self.usi_api = usi_api

        self.converter = {
            "project": ProjectConverter(ontology_api=ontology_api),
            "sample": SampleConverter(ontology_api=ontology_api),
            "study": StudyConverter(ontology_api=ontology_api),
            "sequencingRun": SequencingRunConverter(ontology_api=ontology_api),
            "sequencingExperiment": SequencingExperimentConverter(ontology_api=ontology_api)
        }

        self.converter['sample'].ingest_api = self.ingest_api

    def archive(self, entity_map: ArchiveEntityMap):
        archive_submission = self.archive_metadata(entity_map)
        self.notify_file_archiver(archive_submission)
        archive_submission.validate_and_submit()
        return archive_submission

    def archive_metadata(self, entity_map: ArchiveEntityMap):
        archive_submission = ArchiveSubmission(usi_api=self.usi_api)
        archive_submission.entity_map = entity_map

        converted_entities = list(entity_map.get_converted_entities())

        if converted_entities:
            archive_submission.converted_entities = converted_entities
            archive_submission.usi_submission = self.usi_api.create_submission()
            print(f"USI SUBMISSION: {archive_submission.get_url()}")
            archive_submission.add_entities(archive_submission.converted_entities)
        else:
            archive_submission.is_completed = True
            archive_submission.errors.append({
                "error_message": "No entities found to submit."
            })
            return archive_submission

        return archive_submission

    def complete_submission(self, usi_submission_url):
        archive_submission = ArchiveSubmission(usi_api=self.usi_api, usi_submission_url=usi_submission_url)

        if archive_submission.status == 'Draft':
            archive_submission.validate_and_submit()
        elif archive_submission.status == 'Completed':
            archive_submission.is_completed = True
            archive_submission.process_result()

        return archive_submission

    def get_assay_bundle(self, bundle_uuid):
        return AssayBundle(ingest_api=self.ingest_api, bundle_uuid=bundle_uuid)

    def convert(self, bundles):
        entity_map = ArchiveEntityMap()
        for idx, bundle_uuid in enumerate(bundles):
            print(f'\n* PROCESSING BUNDLE {idx + 1}/{len(bundles)}: {bundle_uuid}')
            assay_bundle = self.get_assay_bundle(bundle_uuid)
            entities = self._convert(assay_bundle)
            entity_map.add_entities(entities)
        return entity_map

    def _convert(self, assay_bundle: AssayBundle):
        aggregator = ArchiveEntityAggregator(assay_bundle, alias_prefix=self.alias_prefix)

        entities = []
        for archive_entity_type in ["project", "study", "sample", "sequencingExperiment", "sequencingRun"]:
            print(f"Finding {archive_entity_type} entities in bundle...")
            progress_ctr = 0

            if self.exclude_types and archive_entity_type in self.exclude_types:
                print(f"Skipping {archive_entity_type} entities in bundle...")
                continue

            for archive_entity in aggregator.get_archive_entities(archive_entity_type):
                progress_ctr = progress_ctr + 1
                _print_same_line(str(progress_ctr))

                converter = self.converter[archive_entity_type]

                current_version = self.usi_api.get_current_version(archive_entity.archive_entity_type,
                                                                   archive_entity.id)
                if current_version and current_version.get('accession'):
                    archive_entity.accession = current_version.get('accession')
                    archive_entity.errors.append({
                        "error_message": f"This alias has already been submitted to USI, accession: {archive_entity.accession}.",
                        "details": {
                            "current_version": current_version["_links"]["self"]["href"]
                        }
                    })
                elif current_version and not current_version.get('accession'):
                    archive_entity.errors.append({
                        "error_message": f'This alias has already been submitted to USI, but still has no accession.',
                        "details": {
                            "current_version": current_version["_links"]["self"]["href"]
                        }
                    })

                elif IngestArchiver.is_metadata_accessioned(archive_entity):
                    archive_entity.errors.append({
                        "error_message": 'Metadata already have an accession'
                    })
                else:
                    try:
                        archive_entity.conversion = converter.convert(archive_entity.data)
                        archive_entity.conversion['alias'] = archive_entity.id
                        archive_entity.conversion.update(archive_entity.links)

                    except ConversionError as e:
                        archive_entity.errors.append({
                            "error_message": f'An error occured converting data to a {archive_entity_type}: {str(e)}.',
                            "details": {"data": json.loads(archive_entity.data)}
                        })

                entities.append(archive_entity)
            print("")

        return entities

    # TODO save notification to file for now, should be sending to rabbit mq in the future
    def notify_file_archiver(self, archive_submission: ArchiveSubmission):

        messages = []
        # TODO a bit redundant with converter, refactor this
        for entity in archive_submission.converted_entities:
            if entity.archive_entity_type == 'sequencingRun':
                data = entity.data
                files = []

                for file in data.get('files'):
                    obj = {
                        # required fields
                        "name": file['content']['file_core']['file_name'],
                        "read_index": file['content']['read_index'],
                     }
                    files.append(obj)

                message = {
                    "usi_api_url": self.usi_api.url,
                    "ingest_api_url": self.ingest_api.url,
                    "submission_url": archive_submission.get_url(),
                    "files": files,
                    "bundle_uuid": entity.bundle_uuid
                }

                if util.is_10x(data.get("library_preparation_protocol")):
                    message["conversion"] = {}
                    message["conversion"]["output_name"] = f"{data['bundle_uuid']}.bam"
                    message["conversion"]["inputs"] = files
                    message["files"] = [f"{data['bundle_uuid']}.bam"]

                messages.append(message)

        archive_submission.file_upload_info = messages
        return messages

    @staticmethod
    def is_metadata_accessioned(entity: ArchiveEntity):
        if entity.archive_entity_type != "sample":
            return False

        sample = entity.data.get("biomaterial")
        if sample:
            return ("biomaterial_core" in sample["content"]) and ("biosd_biomaterial" in sample["content"]["biomaterial_core"])

        return False


class ArchiveEntityAggregator:
    def __init__(self, assay_bundle: AssayBundle, alias_prefix):
        self.assay_bundle = assay_bundle
        self.alias_prefix = alias_prefix

    def _get_projects(self):
        project = self.assay_bundle.get_project()
        if not project:
            return []
        archive_entity = ArchiveEntity()
        archive_type = "project"
        archive_entity.archive_entity_type = archive_type
        archive_entity.id = self.generate_archive_entity_id(archive_type, project)
        archive_entity.data = {"project": project}
        archive_entity.bundle_uuid = self.assay_bundle.bundle_uuid
        return [archive_entity]

    def _get_studies(self):
        project = self.assay_bundle.get_project()
        if not project:
            return []
        archive_entity = ArchiveEntity()
        archive_entity.bundle_uuid = self.assay_bundle.bundle_uuid
        archive_type = "study"
        archive_entity.archive_entity_type = archive_type
        archive_entity.id = self.generate_archive_entity_id(archive_type, project)
        archive_entity.data = {"project": project}
        archive_entity.links = {
            "projectRef": {
                "alias": self.generate_archive_entity_id('project', project)
            }
        }
        return [archive_entity]

    def _get_samples(self):
        samples = []
        for biomaterial in self.assay_bundle.get_biomaterials():
            archive_entity = ArchiveEntity()
            archive_entity.bundle_uuid = self.assay_bundle.bundle_uuid
            archive_type = "sample"
            archive_entity.archive_entity_type = archive_type
            archive_entity.id = self.generate_archive_entity_id(archive_type, biomaterial)
            archive_entity.data = {"biomaterial": biomaterial}
            samples.append(archive_entity)
        return samples

    def _get_sequencing_experiments(self):
        assay_bundle = self.assay_bundle
        process = assay_bundle.get_assay_process()
        if not process:
            return []
        input_biomaterial = assay_bundle.get_input_biomaterial()

        archive_entity = ArchiveEntity()
        archive_entity.bundle_uuid = self.assay_bundle.bundle_uuid
        archive_type = "sequencingExperiment"
        archive_entity.archive_entity_type = archive_type
        archive_entity.id = self.generate_archive_entity_id(archive_type, process)
        archive_entity.data = {
            'process': process,
            'library_preparation_protocol': assay_bundle.get_library_preparation_protocol(),
            'sequencing_protocol': assay_bundle.get_sequencing_protocol(),
            'input_biomaterial': input_biomaterial
        }

        links = {}
        links['studyRef'] = {
            "alias": self.generate_archive_entity_id('study', assay_bundle.get_project())
        }
        links['sampleUses'] = []
        sample_ref = {
            'sampleRef': {
                "alias": self.generate_archive_entity_id('sample', input_biomaterial)
            }
        }
        links['sampleUses'].append(sample_ref)

        archive_entity.links = links

        return [archive_entity]

    def _get_sequencing_runs(self):
        assay_bundle = self.assay_bundle
        process = assay_bundle.get_assay_process()
        archive_entity = ArchiveEntity()
        archive_entity.bundle_uuid = self.assay_bundle.bundle_uuid
        archive_type = "sequencingRun"
        archive_entity.archive_entity_type = archive_type
        archive_entity.id = self.generate_archive_entity_id(archive_type, process)
        archive_entity.data = {
            'library_preparation_protocol': assay_bundle.get_library_preparation_protocol(),
            'process': assay_bundle.get_assay_process(),
            'files': assay_bundle.get_files(),
            'bundle_uuid': assay_bundle.bundle_uuid
        }
        archive_entity.links = {
            'assayRefs': [{
                "alias": self.generate_archive_entity_id('sequencingExperiment', process)
            }]
        }
        return [archive_entity]

    def get_archive_entities(self, archive_entity_type):
        entities = []
        if archive_entity_type == "project":
            entities = self._get_projects()
        elif archive_entity_type == "study":
            entities = self._get_studies()
        elif archive_entity_type == "sample":
            entities = self._get_samples()
        elif archive_entity_type == "sequencingExperiment":
            entities = self._get_sequencing_experiments()
        elif archive_entity_type == "sequencingRun":
            entities = self._get_sequencing_runs()
        return entities

    def generate_archive_entity_id(self, archive_entity_type, entity):
        uuid = entity["uuid"]["uuid"]
        return f"{self.alias_prefix}{archive_entity_type}_{uuid}"