datalad/datalad-neuroimaging

View on GitHub
datalad_neuroimaging/extractors/dicom.py

Summary

Maintainability
B
4 hrs
Test Coverage
# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""DICOM metadata extractor"""
from __future__ import absolute_import

from six import string_types
import os.path as op
import logging
lgr = logging.getLogger('datalad.metadata.extractors.dicom')
from datalad.log import log_progress

try:
    # renamed for 1.0 release
    import pydicom as dcm
    from pydicom.errors import InvalidDicomError
    from pydicom.dicomdir import DicomDir
except ImportError:  # pragma: no cover
    import dicom as dcm
    from dicom.errors import InvalidDicomError
    from dicom.dicomdir import DicomDir

try:
    from collections.abc import MutableSequence
except ImportError:
    from collections import MutableSequence

from distutils.version import LooseVersion
from datalad_deprecated.metadata.definitions import vocabulary_id
from datalad_deprecated.metadata.extractors.base import BaseMetadataExtractor


# pydicom 2.0.0 renamed PersonName3 to PersonName:
PersonName = dcm.valuerep.PersonName3 \
    if LooseVersion(dcm.__version__) < "2.0.0" else dcm.valuerep.PersonName
# Data types we care to extract/handle
_SCALAR_TYPES = (
    int, float, string_types, dcm.valuerep.DSfloat, dcm.valuerep.IS,
    PersonName)
# Since pydicom 1.0 MultiValue is no longer subclass of list
# but of collections{.abc,}.MutableSequence . To make sure we
# do not miss any of those - match to both
_SEQUENCE_TYPES = (list, tuple, dcm.multival.MultiValue, MutableSequence)


def _is_good_type(v):
    if isinstance(v, _SCALAR_TYPES):
        return True
    elif isinstance(v, _SEQUENCE_TYPES):
        return all(map(_is_good_type, v))


def _sanitize_unicode(s):
    return s.replace(u"\u0000", "").strip()


def _convert_value(v):
    t = type(v)
    if v is None:
        cv = v
    elif t in (int, float):
        cv = v
    elif t == str:
        cv = _sanitize_unicode(v)
    elif t == bytes:
        s = v.decode('ascii', 'replace')
        cv = _sanitize_unicode(s)
    elif t == dcm.valuerep.DSfloat:
        cv = float(v)
    elif t == dcm.valuerep.IS:
        cv = int(v)
    elif t == PersonName:
        cv = str(v)
    elif isinstance(v, _SEQUENCE_TYPES):
        cv = list(map(_convert_value, v))
    else:
        cv = v
    return cv


context = {
    'dicom': {
        # switch to http://dicom.nema.org/resources/ontology/DCM/
        # but requires mapping plain text terms to numbers
        '@id': 'http://semantic-dicom.org/dcm#',
        'description': 'DICOM vocabulary (seemingly incomplete)',
        'type': vocabulary_id}
}


def _struct2dict(struct):
    out = {}
    for k in struct.dir():
        if hasattr(struct, k):
            value = getattr(struct, k)
            if _is_good_type(value):
                out[k] = _convert_value(value)
            else:
                lgr.debug(
                    "Skipping field %s of the type %s which we do not handle",
                    k, type(value)
                )
    return out


class MetadataExtractor(BaseMetadataExtractor):

    _unique_exclude = {
        "AcquisitionTime",
        "ContentTime",
        "InstanceCreationTime",
        "InstanceNumber",
        # this one is actually debatable, if there is a reasonable use case
        # where one would know such a UID and needed to find the dataset with
        # this file, we should keep it in -- but I don't know any ATM
        # and we do still have SeriesInstanceUID
        "SOPInstanceUID",
        "SliceLocation",
        "TemporalPositionIdentifier",
        "TriggerTime",
        "WindowCenter",
        "WindowWidth",
    }

    def get_metadata(self, dataset, content):
        imgseries = {}
        imgs = {}
        log_progress(
            lgr.info,
            'extractordicom',
            'Start DICOM metadata extraction from %s', self.ds,
            total=len(self.paths),
            label='DICOM metadata extraction',
            unit=' Files',
        )
        for f in self.paths:
            absfp = op.join(self.ds.path, f)
            log_progress(
                lgr.info,
                'extractordicom',
                'Extract DICOM metadata from %s', absfp,
                update=1,
                increment=True)

            if op.basename(f).startswith('PSg'):
                # ignore those dicom files, since they appear to not contain
                # any relevant metadata for image series, but causing trouble
                # (see gh-2210). We might want to change that whenever we get
                # a better understanding of how to deal with those files.
                lgr.debug("Ignoring DICOM file %s", f)
                continue

            try:
                d = dcm.read_file(absfp, defer_size=1000, stop_before_pixels=True)
            except InvalidDicomError:
                # we can only ignore
                lgr.debug('"%s" does not look like a DICOM file, skipped', f)
                continue

            if isinstance(d, DicomDir):
                lgr.debug("%s appears to be a DICOMDIR file. Extraction not yet"
                          " implemented, skipped", f)
                continue

            ddict = None
            if content:
                ddict = _struct2dict(d)
                imgs[f] = ddict
            if d.SeriesInstanceUID not in imgseries:
                # start with a copy of the metadata of the first dicom in a series
                series = _struct2dict(d) if ddict is None else ddict.copy()
                # store directory containing the image series (good for sorted
                # DICOM datasets)
                series_dir = op.dirname(f)
                series['SeriesDirectory'] = series_dir if series_dir else op.curdir
                series_files = []
            else:
                series, series_files = imgseries[d.SeriesInstanceUID]
                # compare incoming with existing metadata set
                series = {
                    k: series[k] for k in series
                    # only keys that exist and have values that are identical
                    # across all images in the series
                    if _convert_value(getattr(d, k, None)) == series[k]
                }
            series_files.append(f)
            # store
            imgseries[d.SeriesInstanceUID] = (series, series_files)
        log_progress(
            lgr.info,
            'extractordicom',
            'Finished DICOM metadata extraction from %s', self.ds
        )

        dsmeta = {
            '@context': context,
            'Series': [info for info, files in imgseries.values()]
        }
        return (
            dsmeta,
            # yield the corresponding series description for each file
            imgs.items() if content else []
        )