datalad/datalad-neuroimaging

View on GitHub
datalad_neuroimaging/bids2scidata.py

Summary

Maintainability
F
3 days
Test Coverage
# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""generate metadata for submission to Scientific Data from a BIDS dataset"""

__docformat__ = 'restructuredtext'


import logging
lgr = logging.getLogger('datalad.neuroimaging.bids2scidata')

from collections import OrderedDict
import os
import re
from datetime import datetime
from io import open
from os.path import exists
from os.path import relpath
from os.path import abspath
from os.path import dirname
from os.path import join as opj
from os.path import split as psplit

import datalad
import datalad_neuroimaging
from datalad import cfg
from datalad.distribution.dataset import require_dataset
from datalad.interface.base import Interface
from datalad.interface.base import build_doc
from datalad.support.param import Parameter
from datalad.distribution.dataset import datasetmethod
from datalad.interface.utils import eval_results
from datalad.distribution.dataset import EnsureDataset
from datalad.support.constraints import EnsureNone
from datalad.utils import assure_list
from datalad_deprecated.metadata.metadata import Metadata
from six.moves.urllib.parse import urlsplit
from six.moves.urllib.parse import urlunsplit
from posixpath import split as posixsplit


# Adapt to metadata code move from datalad-core to datalad-deprecated
metadata = Metadata.__call__


try:
    import pandas as pd
except ImportError:
    pd = None

# regex for a cheap test if something looks like a URL
r_url = re.compile(r"^https?://")

common_sample_props = {
    "Characteristics[organism]": "homo sapiens",
    "Characteristics[organism part]": "brain",
    "Protocol REF": "Participant recruitment",
}

# standardize columns from participants.tsv
sample_property_name_map = {
    "age": "Characteristics[age at scan]",
    "age(years)": "Characteristics[age at scan]",
    "gender": "Characteristics[sex]",
    "handedness": "Characteristics[handedness]",
    "participant_id": "Sample Name",
    "id": "Sample Name",
    "sex": "Characteristics[sex]",
}

# map datalad ontology prefixes to ontology labels recognized by scidata
ontology_map = {
    'uo': 'UO',
    'pato': 'PATO',
}

# this defines both the mapping and order of assay properties as they
# will be presented in an assay table
# case 0: any term is properly defined in the metadata context
# (datalad_term, isatab_term)
# if datalad term is None, it needs to come from somewhere else -> special case
# if datalad term is '', it comes from the filename
# case 1: the definition for a term value comes from another metadata field
# (datalad_term, isatab_term, datalad_valuedef
# case 2: we take the value as-is and define a unit for it
# (datalad_term, isatab_term, isatab_unitvalue, isatab_unitdef
recognized_assay_props = (
    (('bids', 'participant_id'), "Sample Name"),
    (None, "Protocol REF"),
    # BIDS repetition time by default, but override with info from
    # file if possible
    (("bids", "RepetitionTime"), "Parameter Value[4d spacing]"),
    (("nifti1", "temporal_spacing(s)"), 'Parameter Value[4d spacing]'),
    (("nifti1", "spatial_resolution(mm)"), "Parameter Value[resolution]"),
    (("bids", "EchoTime"), "Parameter Value[echo time]", "second", 'UO:0000010'),
    (("bids", "FlipAngle"), "Parameter Value[flip angle]", "degree", 'UO:0000185'),
    ('', "Parameter Value[modality]"),
    (("bids", "Manufacturer"), "Parameter Value[instrument manufacturer]"),
    (("bids", "ManufacturerModelName"), 'Parameter Value[instrument name]'),
    (("bids", "HardcopyDeviceSoftwareVersion"), 'Parameter Value[instrument software version]'),
    (("bids", "MagneticFieldStrength"), 'Parameter Value[magnetic field strength]', 'tesla', 'UO:0000228'),
    (("bids", "ReceiveCoilName"), "Parameter Value[coil type]"),
    (("bids", "PulseSequenceType"), "Parameter Value[sequence]"),
    (("bids", "ParallelAcquisitionTechnique"), "Parameter Value[parallel acquisition technique]"),
    ('', "Assay Name"),
    ('', "Raw Data File"),
    (None, "Comment[Data Repository]"),
    (None, "Comment[Data Record Accession]"),
    (None, "Comment[Data Record URI]"),
    (("bids", "TaskName"), "Factor Value[task]", "bids:CogAtlasID"),
)

# functors to send a sequence of stringified sequence elements to
# build the table representation of a value
repr_props = {
    "Parameter Value[resolution]": 'x'.join,
}

# properties of assay tables to track
assay_props = (
    'assay_fname',
    'assay_techtype', 'assay_techtype_term', 'assay_techtype_termsrc',
    'assay_measurementtype', 'assay_measurementtype_term',
    'assay_measurementtype_termsrc',
)

# properties of study protocols to track
protocol_props = (
    'protocol_name',
    'protocol_type',
    'protocol_term',
    'protocol_termsrc',
    'protocol_parameters',
)

protocol_defs = {
    'Participant recruitment': {
        'type': 'selection',
        'term': 'OBI:0001928',
        'termsrc': 'OBI'},
    'Magnetic Resonance Imaging': {
        'type': 'nuclear magnetic resonance',
        'term': 'OBI:0000182',
        'termsrc': 'OBI'},
}


def getprop(obj, path, default):
    """Helper to get a property from a metadata structure that might not be there"""
    for p in path:
        if p not in obj:
            return default
        obj = obj[p]
    return obj


def split_term_source_accession(val):
    if val is None:
        return '', ''
    if not r_url.match(val):
        # no URL
        if ':' in val:
            val_l = val.split(':')
            vocab = ontology_map.get(val_l[0], val_l[0])
            return vocab, '{}:{}'.format(vocab, val[len(val_l[0]) + 1:])
        else:
            # no idea
            lgr.warn("Could not identify term source REF in: '%s'", val)
            return '', val
    else:
        try:
            # this is a URL, assume simple: last path segment is accession id
            url_s = urlsplit(val)
            urlpath, accession = posixsplit(url_s.path)
            term_source = urlunsplit((url_s[0], url_s[1], urlpath, url_s[3], url_s[4]))
            return ontology_map.get(term_source, term_source), accession
        except Exception as e:
            lgr.warn("Could not identify term source REF in: '%s' [%s]", val, exc_str(e))
            return '', val


def _get_study_df(dsmeta):
    # TODO use helper
    participants = getprop(
        dsmeta,
        ["metadata", "datalad_unique_content_properties", "bids", "subject"],
        [])
    sample_info = [
        dict(
            common_sample_props,
            **{sample_property_name_map.get(
                k,
                "Comment[{}]".format(k.lower())): v
               for k, v in p.items()})
        for p in participants]
    columns_addon = set()
    for i in sample_info:
        # we use the participant ID as the source name
        i["Source Name"] = i.get("Sample Name", None)
        columns_addon.update(i.keys())
    # convert to data frame, impose strict order
    column_order = [
        "Source Name",
        "Characteristics[organism]",
        "Characteristics[organism part]",
        "Protocol REF",
        "Sample Name"
    ]
    df = pd.DataFrame(
        sample_info,
        columns=column_order + sorted(c for c in columns_addon if c not in column_order))
    if 'Sample Name' in df:
        df = df.sort_values(['Sample Name'])
    return df


def _describe_file(dsmeta, fmeta):
    meta = getprop(fmeta, ['metadata'], {})
    bidsmeta = getprop(meta, ['bids'], {})
    suffix = getprop(bidsmeta, ['suffix'], None)
    if suffix is None:
        raise ValueError('file record has no type info, not sure what this is')
    info = {
        'Sample Name': getprop(bidsmeta, ['subject', 'id'], None),
        # assay name is the entire filename except for the modality suffix
        # so that, e.g. simultaneous recordings match wrt to the assay name
        # across assay tables
        'Assay Name': psplit(fmeta['path'])[-1].split('.')[0][:-(len(suffix) + 1)],
        'Raw Data File': relpath(fmeta['path'], start=fmeta['parentds']),
        'Parameter Value[modality]': suffix,
    }
    for l in ('rec', 'recording'):
        if l in bidsmeta:
            info['Parameter Value[recording label]'] = bidsmeta[l]
    for l in ('acq', 'acquisition'):
        if l in bidsmeta:
            info['Parameter Value[acquisition label]'] = bidsmeta[l]
    if 'task' in bidsmeta:
        info['Factor Value[task]'] = bidsmeta['task']

    # now pull in the value of all recognized properties
    # perform any necessary conversion to achieve final
    # form for ISATAB table
    for prop in recognized_assay_props:
        src, dst = prop[:2]
        # expand src specfification
        namespace, src = src if src else (None, src)
        if src is None:
            # special case, not handled here
            continue
        if src in meta.get(namespace, {}):
            # pull out the value from datalad metadata directly,
            # unless we have a definition URL in another field,
            # in which case put it into a 2-tuple also
            val = meta[namespace][src]
            if dst in repr_props and isinstance(val, (list, tuple)):
                val = repr_props[dst](str(i) for i in val)
            info[dst] = val
            if len(prop) == 4:
                # we have a unit definition
                info['{}_unit_label'.format(dst)] = prop[2]
            term_source = term_accession = None
            if len(prop) > 2:
                term_source, term_accession = split_term_source_accession(
                    meta.get(prop[2], None) if len(prop) == 3 else prop[3])
            elif prop[0] and prop[1] != 'Sample Name':
                # exclude all info source outside the metadata
                # this plugin has no vocabulary info for this field
                # we need to look into the dataset context to see if we know
                # anything
                # FIXME TODO the next line should look into the local context
                # of the 'bids' metadata source
                termdef = getprop(dsmeta, ['metadata', namespace, '@context', src], {})
                term_source, term_accession = split_term_source_accession(
                    termdef.get('unit' if 'unit' in termdef else '@id', None))
                if 'unit_label' in termdef:
                    info['{}_unit_label'.format(dst)] = termdef['unit_label']
            if term_accession is not None:
                info['{}_term_source'.format(dst)] = term_source
                info['{}_term_accession'.format(dst)] = term_accession

    return info


def _get_colkey(idx, colname):
    return '{0:0>5}_{1}'.format(idx, colname)


def _get_assay_df(
        dsmeta,
        modality, protocol_ref, files, file_descr,
        repository_info=None):
    if not repository_info:
        repository_info = {}
    # main assays
    # we cannot use a dict to collect the data before going to
    # a data frame, because we will have multiple columns with
    # the same name carrying the ontology info for preceding
    # columns
    # --> prefix with some index, create dataframe and rename
    # the column names in the dataframe with the prefix stripped
    assay_dict = {}
    # get all files described and determine the union of all keys
    finfos = []
    info_keys = set()
    for finfo in files:
        # get file metadata in ISATAB notation
        finfo = file_descr(dsmeta, finfo)
        info_keys = info_keys.union(finfo.keys())
        finfos.append(finfo)
    # receiver for data in all columns of the table
    collector_dict = dict(zip(info_keys, [[] for i in range(len(info_keys))]))
    # expand data with missing values across all files/rows
    for finfo in finfos:
        for spec in info_keys:
            fspec = finfo.get(spec, None)
            collector_dict[spec].append(fspec)
    # build the table order
    idx = 1
    idx_map = {}
    assay_name_key = None
    for prop in recognized_assay_props:
        colname = prop[1]
        if colname in idx_map:
            # we already know about this column, that means it has multiple sources
            # and we have processed one already. no need to do anything in addition
            continue
        if prop[0] is None:
            # special case handling
            if colname == 'Protocol REF':
                assay_dict[_get_colkey(idx, colname)] = protocol_ref
                idx += 1
            elif colname in repository_info:
                assay_dict[_get_colkey(idx, colname)] = repository_info[colname]
                idx += 1
            continue

        elif colname not in collector_dict:
            # we got nothing for this column
            continue
        # skip empty
        if not all([v is None for v in collector_dict[colname]]):
            # be able to look up the actual column key in case
            # prev information needs to be replaced by a value from
            # a better source (as determined by the order)
            colkey = idx_map.get(colname, _get_colkey(idx, colname))
            idx_map[colname] = colkey
            assay_dict[colkey] = collector_dict[colname]
            idx += 1
            if colname == 'Assay Name':
                assay_name_key = colkey
            for aux_info, aux_colname in (
                    ('unit_label', 'Unit'),
                    ('term_source', 'Term Source REF'),
                    ('term_accession', 'Term Accession Number')):
                aux_source = '{}_{}'.format(colname, aux_info)
                if aux_source not in collector_dict:
                    # we got nothing on this from any file
                    continue
                assay_dict[_get_colkey(idx, aux_colname)] = collector_dict[aux_source]
                idx += 1

    if assay_name_key is None:
        # we didn't get a single meaningful file
        return None

    # TODO use assay name as index! for join with deface later on
    df = pd.DataFrame(assay_dict, index=assay_dict[assay_name_key])
    return df


def _store_beautiful_table(df, output_directory, fname):
    if df is None:
        return
    if 'Assay Name' in df:
        df = df.sort_values(['Assay Name'])
    df.to_csv(
        opj(output_directory, fname),
        sep="\t",
        index=False)


def _gather_protocol_parameters_from_df(df, protocols):
    params = set()
    protos = None
    for i, col in enumerate(list(df.columns) + ['Protocol REF']):
        if col == 'Protocol REF':
            if protos is not None:
                # we had some before, store
                for p in protos:
                    pdef = protocols.get(p, set()).union(params)
                    protocols[p] = pdef
                if i > len(df.columns) - 1:
                    break
            # this is a protocol definition column,
            # make entry for each unique value
            protos = df.iloc[:, i].unique()
        if col.startswith('Parameter Value['):
            params.add(col[16:-1])


def convert(
        dsmeta,
        filemeta,
        output_directory,
        repository_info=None):
    if pd is None:
        lgr.error(
            "This plugin requires Pandas to be available (error follows)")
        import pandas
        return

    # collect infos about dataset and ISATAB structure for use in investigator
    # template
    info = {}
    if not exists(output_directory):
        lgr.info(
            "creating output directory at '{}'".format(output_directory))
        os.makedirs(output_directory)

    # prep for assay table info
    protocols = OrderedDict()
    for prop in assay_props:
        info[prop] = []

    # pull out essential metadata bits about the dataset itself
    # for study description)
    dsbidsmeta = getprop(dsmeta, ['metadata', 'bids'], {})
    info['name'] = dsbidsmeta.get('shortdescription', dsbidsmeta.get('name', 'TODO'))
    info['author'] = '\t'.join(assure_list(dsbidsmeta.get('author', [])))
    info['keywords'] = '\t'.join(assure_list(dsbidsmeta.get('tag', [])))
    # generate: s_study.txt
    study_df = _get_study_df(dsmeta)
    if study_df.empty:
        # no samples, no assays, no metadataset
        return None

    _gather_protocol_parameters_from_df(study_df, protocols)
    _store_beautiful_table(
        study_df,
        output_directory,
        "s_study.txt")
    info['studytab_filename'] = 's_study.txt'

    deface_df = None
    # all imaging modalities recognized in BIDS
    #TODO maybe fold 'defacemask' into each suffix as a derivative
    for suffix in ('defacemask',
                     'T1w', 'T2w', 'T1map', 'T2map', 'FLAIR', 'FLASH', 'PD',
                     'PDmap', 'PDT2', 'inplaneT1', 'inplaneT2', 'angio',
                     'sbref', 'bold', 'SWImagandphase'):
        # what files do we have for this type
        modfiles = [f for f in filemeta
                    if getprop(f, ['metadata', 'bids', 'suffix'], None) == suffix]
        if not len(modfiles):
            # not files found, try next
            lgr.info(
                "no files match MRI suffix '{}', skipping".format(suffix))
            continue

        df = _get_assay_df(
            dsmeta,
            suffix,
            "Magnetic Resonance Imaging",
            modfiles,
            _describe_file,
            repository_info)
        if df is None:
            continue
        if suffix == 'defacemask':
            # rename columns to strip index
            df.columns = [c[6:] for c in df.columns]
            df.rename(columns={'Raw Data File': 'Derived Data File'}, inplace=True)
            df.drop(
                ['Assay Name', 'Sample Name'] +
                [c for c in df.columns if c.startswith('Factor')],
                axis=1,
                inplace=True)
            deface_df = df
            # re-prefix for merge logic compatibility below
            deface_df.columns = [_get_colkey(i, c) for i, c in enumerate(df.columns)]
            # do not save separate, but include into the others as a derivative
            continue
        elif deface_df is not None:
            # get any factor columns, put last in final table
            factors = []
            # find where they stat
            for i, c in enumerate(df.columns):
                if '_Factor Value[' in c:
                    factors = df.columns[i:]
                    break
            factor_df = df[factors]
            df.drop(factors, axis=1, inplace=True)
            # merge relevant rows from deface df (hstack), by matching assay name
            df = df.join(deface_df, rsuffix='_deface')
            df.columns = [c[:-7] if c.endswith('_deface') else c for c in df.columns]
            # cannot have overlapping columns, we removed the factor before
            df = df.join(factor_df)
        # rename columns to strip index
        df.columns = [c[6:] for c in df.columns]
        # parse df to gather protocol info
        _gather_protocol_parameters_from_df(df, protocols)
        # store
        assay_fname = "a_mri_{}.txt".format(suffix.lower())
        _store_beautiful_table(
            df,
            output_directory,
            assay_fname)
        info['assay_fname'].append(assay_fname)
        info['assay_techtype'].append('nuclear magnetic resonance')
        info['assay_techtype_term'].append('OBI:0000182')
        info['assay_techtype_termsrc'].append('OBI')
        info['assay_measurementtype'].append('MRI Scanner')
        info['assay_measurementtype_term'].append('ERO:MRI_Scanner')
        info['assay_measurementtype_termsrc'].append('ERO')

    # non-MRI modalities
    for modlabel, assaylabel, protoref in (
            ('physio', 'physio', "Physiological Measurement"),
            ('stim', 'stimulation', "Stimulation")):
        df = _get_assay_df(
            dsmeta,
            modlabel,
            protoref,
            [f for f in filemeta
             if getprop(f, ['metadata', 'bids', 'suffix'], None) == suffix],
            _describe_file,
            repository_info)
        if df is None:
            continue
        # rename columns to strip index
        df.columns = [c[6:] for c in df.columns]
        assay_fname = "a_{}.txt".format(assaylabel)
        _store_beautiful_table(
            df,
            output_directory,
            assay_fname)
        info['assay_fname'].append(assay_fname)
        # ATM we cannot say anything definitive about these
        info['assay_techtype'].append('TODO')
        info['assay_techtype_term'].append('TODO')
        info['assay_techtype_termsrc'].append('TODO')
        info['assay_measurementtype'].append(assaylabel)
        info['assay_measurementtype_term'].append('TODO')
        info['assay_measurementtype_termsrc'].append('TODO')

    # post-proc assay-props for output
    for prop in assay_props:
        info[prop] = '\t'.join(assure_list(info[prop]))

    info['protocol_name'] = '\t'.join(protocols.keys())
    for k in ('type', 'term', 'termsrc'):
        info['protocol_{}'.format(k)] = '\t'.join(
            protocol_defs.get(p, {}).get(k, 'TODO') for p in protocols)
    info['protocol_parameters'] = '\t'.join(
            '; '.join(sorted(protocols[p])) for p in protocols)
    return info


#
# Make it work seamlessly as a datalad export plugin
#
@build_doc
class BIDS2Scidata(Interface):
    """BIDS to ISA-Tab converter"""
    _params_ = dict(
        dataset=Parameter(
            args=("-d", "--dataset"),
            doc="""Dataset to query for metadata of a BIDS-compatible dataset.
            The queried dataset itself does not have to be a BIDS dataset.
            If not dataset is given, an attempt is made to identify the dataset
            based on the current working directory.""",
            constraints=EnsureDataset() | EnsureNone()),
        path=Parameter(
            args=('path',),
            doc="""path to a BIDS-compatible dataset to export metadata on"""),
        repo_name=Parameter(
            args=('--repo-name',),
            doc="""data repository name to be used in assay tables.
            Example: OpenfMRI"""),
        repo_accession=Parameter(
            args=('--repo-accession',),
            doc="""data repository accession number to be used in assay tables.
            Example: ds000113d"""),
        repo_url=Parameter(
            args=('--repo-url',),
            doc="""data repository URL to be used in assay tables.
            Example: https://openfmri.org/dataset/ds000113d"""),
        output=Parameter(
            args=('--output',),
            doc="""directory where ISA-TAB files will be stored""",)
    )

    @staticmethod
    @datasetmethod(name='bids2scidata')
    @eval_results
    def __call__(repo_name, repo_accession, repo_url, path=None, output=None, dataset=None):
        # we need this resource file, no point in starting without it
        default_path = opj(dirname(datalad_neuroimaging.__file__), 'resources', 'isatab',
                'scidata_bids_investigator.txt')
        itmpl_path = cfg.obtain(
            'datalad.plugin.bids2scidata.investigator.template',
            default=default_path)
        if itmpl_path == default_path and not os.path.isabs(default_path):
            # see https://github.com/datalad/datalad/issues/2514
            raise RuntimeError(
                "Do not run within the datalad_neuroimaging source tree")

        if path and dataset is None:
            dataset = path
        dataset = require_dataset(
            dataset, purpose='metadata query', check_installed=True)

        errored = False
        dsmeta = None
        filemeta = []
        for m in metadata(
                path,
                dataset=dataset,
                # BIDS hierarchy might go across multiple dataset
                recursive=True,
                reporton='all',
                return_type='generator',
                result_renderer='disabled'):
            type = m.get('type', None)
            if type not in ('dataset', 'file'):
                continue
            if m.get('status', None) != 'ok':
                errored = errored or m.get('status', None) in ('error', 'impossible')
                yield m
                continue
            if type == 'dataset':
                if dsmeta is not None:
                    lgr.warning(
                        'Found metadata for more than one datasets, '
                        'ignoring their dataset-level metadata')
                    continue
                dsmeta = m
            elif type == 'file':
                filemeta.append(m)
        if errored:
            return

        if not dsmeta or not 'refcommit' in dsmeta:
            yield dict(
                status='error',
                message=("could not find aggregated metadata on path '%s'", path),
                path=dataset.path,
                type='dataset',
                action='bids2scidata',
                logger=lgr)
            return

        lgr.info("Metadata for %i files associated with '%s' on record in %s",
                 len(filemeta),
                 path,
                 dataset)

        if not output:
            output = 'scidata_isatab_{}'.format(dsmeta['refcommit'])

        info = convert(
            dsmeta,
            filemeta,
            output_directory=output,
            repository_info={
                'Comment[Data Repository]': repo_name,
                'Comment[Data Record Accession]': repo_accession,
                'Comment[Data Record URI]': repo_url},
        )
        if info is None:
            yield dict(
                status='error',
                message='dataset does not seem to contain relevant metadata',
                path=dataset.path,
                type='dataset',
                action='bids2scidata',
                logger=lgr)
            return

        itmpl = open(itmpl_path, encoding='utf-8').read()
        with open(opj(output, 'i_Investigation.txt'), 'w', encoding='utf-8') as ifile:
            ifile.write(
                itmpl.format(
                    datalad_version=datalad.__version__,
                    date=datetime.now().strftime('%Y/%m/%d'),
                    repo_name=repo_name,
                    repo_accession=repo_accession,
                    repo_url=repo_url,
                    **info
                ))
        yield dict(
            status='ok',
            path=abspath(output),
            # TODO add switch to make tarball/ZIP
            #type='file',
            type='directory',
            action='bids2scidata',
            logger=lgr)