sjoerdk/anonapi

View on GitHub
anonapi/mapper.py

Summary

Maintainability
C
1 day
Test Coverage
"""Makes it possible to map a structured file to batch of IDIS jobs.
The file should contain source files to anonymized id, name etc.
Pre-processing step for creating IDIS jobs

Notes
-----
The main class `Mapping` contains a human readable description, some general
options, and a JobParameterGrid which contains one row per job, one parameter
per column.
Originally, The bare JobParameterGrid was named `Mapping`. Later on this turned out
to be insufficient as more information needed to be saved. Some confusing naming might
still exist in some places in the code, referring to JobParameterGrid objects as
mappings. This should be renamed whenever encountered
"""
import csv
import locale
import os

from csv import Dialect
from typing import Dict, Iterable, List, Optional, TextIO, Union

from tabulate import tabulate

from anonapi.exceptions import AnonAPIError
from anonapi.logging import get_module_logger
from anonapi.parameters import (
    Parameter,
    ParameterError,
    ParameterSet,
    StudyInstanceUIDIdentifier,
    AccessionNumberIdentifier,
    SourceIdentifierParameter,
    ParameterFactory,
    PseudoName,
    Description,
    ALL_PARAMETERS,
    ParameterParsingError,
)
from collections import defaultdict
from io import StringIO
from pathlib import Path

DEFAULT_MAPPING_NAME = (
    "anon_mapping.csv"  # Filename for mapping if not specified
)

logger = get_module_logger(__name__)


class Mapping:
    """All information needed for creating anonymization jobs

    Contains a JobParameterGrid plus description and mapping-wide settings
    such as output dir
    """

    # Headers used in between sections in csv file
    DESCRIPTION_HEADER = "## Description ##"
    OPTIONS_HEADER = "## Options ##"
    GRID_HEADER = "## Mapping ##"
    ALL_HEADERS = [DESCRIPTION_HEADER, OPTIONS_HEADER, GRID_HEADER]

    def __init__(
        self,
        grid: "JobParameterGrid",
        options: Optional[List[Parameter]] = None,
        description="",
        dialect: Union[str, Dialect] = "excel",
    ):
        """

        Parameters
        ----------
        grid: JobParameterGrid
            The per-job command_table of parameters
        options: List[Parameter], optional
            List of rows that have been set for the entire mapping. Defaults to empty
        description: str, optional
            Human readable description of this mapping. Can contain newline chars.
            Defaults to empty input
        dialect: Union[str, Dialect], optional
            CSV dialect. Which line separator to use etc. Any Dialect or a input
            returned by the list_dialects() function.
            Defaults to 'excel'
        """
        self.grid = grid
        if options is None:
            options = []
        self.options = options
        self.description = description
        if type(dialect) == str:
            self.dialect = csv.get_dialect(dialect)
        else:
            self.dialect = dialect

    def __len__(self):
        return len(self.grid)

    def save_to(self, f: TextIO):
        """Write this Mapping to given stream"""
        # write description
        f.write(self.DESCRIPTION_HEADER + self.dialect.lineterminator)
        f.write(self.description)
        f.write(self.dialect.lineterminator)

        # write options
        f.write(self.OPTIONS_HEADER + self.dialect.lineterminator)
        f.write(
            self.dialect.lineterminator.join(
                [
                    x.to_string(delimiter=self.dialect.delimiter)
                    for x in self.options
                ]
            )
        )
        f.write(self.dialect.lineterminator)
        f.write(self.dialect.lineterminator)

        # write mapping
        f.write(self.GRID_HEADER + self.dialect.lineterminator)
        mapping_content = StringIO()
        self.grid.save(mapping_content, dialect=self.dialect)
        mapping_content.seek(0)
        f.write(mapping_content.read())

    @classmethod
    def load(cls, lines: Iterable[str]):
        """Load a mapping from lines

        Parameters
        ----------
        lines: Iterable[str]
            Load object from these lines
        """

        try:
            lines = [
                x for x in lines
            ]  # make sure open file handles are read through
        except OSError as e:
            if "raw readinto() returned invalid length" in str(e):
                raise MappingLoadError(
                    "Cannot load mapping. Is the mapping file opened in any"
                    " editor?"
                ) from e
            else:
                # Unsure which error this is. Can't handle this here.
                raise

        # split content into three sections
        sections = cls.parse_sections(lines)
        description = "".join(sections[cls.DESCRIPTION_HEADER])

        options = [
            ParameterFactory.parse_from_string(line)
            for line in sections[cls.OPTIONS_HEADER]
            if parameter_line_is_empty(line)
        ]

        grid_content = os.linesep.join(sections[cls.GRID_HEADER])
        grid = JobParameterGrid.load(StringIO(grid_content))
        return cls(
            grid=grid,
            options=options,
            description=description,
            dialect=sniff_dialect_safe(lines),
        )

    @classmethod
    def parse_sections(cls, lines: Iterable[str]):
        """A mapping csv file consists of three sections divided by column_types.
         Try to parse each one. Also cleans each line

        Parameters
        ----------
        lines: Iterable[str]
            lines to parse

        Returns
        -------
        Dict
            A dict with all lines under each of the column_types in cls.ALL_HEADERS
            Line endings and trailing commas have been stripped. empty lines
            have been removed

        Raises
        ------
        MappingLoadError
            If not all column_types can be found or are not in the expected order

        """
        collected = defaultdict(list)
        headers_to_find = cls.ALL_HEADERS.copy()
        header_to_find = headers_to_find.pop(0)
        current_header = None
        for line in lines:
            line = (
                line.replace("\r", "")
                .replace("\n", "")
                .rstrip(",")
                .rstrip(";")
            )
            if not line:  # skip empty lines
                continue
            if header_to_find.lower() in line.lower():
                # this is our header, start recording
                current_header = header_to_find
                # and look for the next one. If there is one.
                if headers_to_find:
                    header_to_find = headers_to_find.pop(0)
                continue  # skip header line itself
            if current_header:
                collected[current_header].append(line)

        # check the results do we have all column_types?
        if headers_to_find:
            raise MappingLoadError(
                f'Could not find required column_types "{headers_to_find}"'
            )

        return collected

    @property
    def rows(self):
        """All parameters for each row. This includes the parameters in the
        grid as well as the mapping-wide parameters in the options section.

        Grid parameters overrule mapping-wide parameters

        Returns
        -------
            List[Parameter] for each row in grid
        """
        rows = []
        for grid_row in self.grid.rows:
            row_dict = {type(x): x for x in self.options}
            row_dict.update({type(x): x for x in grid_row})
            rows.append(list(row_dict.values()))
        return rows

    def add_row(self, parameters: List[Parameter]):
        """Add the given list of parameters to this mapping as a new grid row

        Parameters
        ----------
        parameters: List[Parameter]
            The parameters to create one job

        """
        self.grid.append_row(parameters)

    def add_grid(self, grid: "JobParameterGrid"):
        """Add each row in given grid to this mapping"""
        self.grid.append_parameter_grid(grid)

    def to_string(self):
        """Human readable multi-line description of this mapping

        Returns
        -------
        str
        """
        output = self.description
        output += "\n" + self.grid.to_table_string(max_rows=5)
        return output


def sniff_dialect(lines: Iterable[str]) -> Dialect:
    """Try to find out the separator character etc. from given opened csv file

    Parameters
    ----------
    lines: Iterable[str]
        Lines to sniff

    Raises
    ------
    MapperError:
        When dialect cannot be determined

    """

    for line in lines:
        try:
            return csv.Sniffer().sniff(line, delimiters=";,")
        except csv.Error:
            continue  # just try all lines
    raise MapperError("Could not determine dialect for csv file")


def parameter_line_is_empty(line_in: str) -> bool:
    """True if the input line contains only separators or spaces. For cleaning"""
    return bool(line_in.replace(" ", "").replace(",", "").replace(";", ""))


def sniff_dialect_safe(
    lines: Iterable[str], default: Optional[str] = "excel"
) -> Dialect:
    """Try to find out the separator character etc. from given opened csv file.
    Return default if not found

    Parameters
    ----------
    lines: Iterable[str]
        lines to sniff
    default: Optional[str]
        return this dialect when dialect cannot be determined. Defaults to 'excel'
    """
    try:
        return sniff_dialect(lines)
    except MapperError as e:  # this could be a single-column mapping
        logger.debug(
            f"could not determine dialect, guessing "
            f"'{default}'. Original error: '{e}'"
        )
        return default


class JobParameterGrid:
    """A persistable 2D grid of job rows. Each row belongs to one job"""

    def __init__(self, rows: List[List[Parameter]]):
        self.rows = rows

    def __len__(self):
        return len(self.rows)

    def width(self) -> int:
        """Maximum number of columns in this grid"""
        return max(len(x) for x in self.rows)

    def append_row(self, row: List[Parameter]):
        """Append the given row to this grid"""
        self.rows.append(row)

    def append_parameter_grid(self, grid: "JobParameterGrid"):
        """Append all rows in the given grid"""
        for row in grid.rows:
            self.append_row(row)

    def save(self, f: TextIO, dialect: Union[str, Dialect] = "excel"):
        """Write rows as CSV. Will omit columns where each value is none

        Parameters
        ----------
        f: TextIO
            Write to this
        dialect: Union[str, Dialect], optional
            CSV dialect. Which line separator to use etc. Any Dialect or a input
            returned by the list_dialects() function.
            Defaults to 'excel'

        """
        if type(dialect) == str:  # cast to Dialect instance if needed
            dialect = csv.get_dialect(dialect)

        # Which parameter types are there?
        params = self.parameter_types()

        writer = csv.DictWriter(
            f,
            dialect=dialect,
            fieldnames=[x.field_name for x in params],
        )
        writer.writeheader()
        for row in self.rows:
            writer.writerow({x.field_name: x.value for x in row})

    @classmethod
    def load(cls, f):
        """Load an instance from open file handle

        Parameters
        ----------
        f
            file object opened for reading

        Returns
        -------
        JobParameterGrid
            Loaded from data in f

        Raises
        ------
        MappingLoadError:
            If mapping could not be loaded

        """
        lines = f.readlines()
        dialect = sniff_dialect_safe(lines)
        reader = csv.DictReader(lines, dialect=dialect)
        parameters = []
        try:
            for row in reader:
                parameters.append(
                    JobParameterGrid.parse_job_parameter_row(row)
                )
        except ParameterParsingError as e:
            raise MappingLoadError("Problem parsing '{row}'") from e

        return cls(parameters)

    @staticmethod
    def parse_job_parameter_row(row: Dict[str, str]) -> List[Parameter]:
        """Parse a dict of strings as Parameters, perform some initial checks for
        more informative error messages

        Parameters
        ----------
        Dict[str, str]
            dict with parameter key: parameter value. As output by csv.DictReader

        Returns
        -------
        List[Parameter]

        Raises
        ------
        ParameterParsingError
        """
        # check common problem: missing column header
        keys = [x for x in row.keys() if x]  # remove None and empty
        values = list(row.values())
        if len(keys) < len(values):
            raise ParameterParsingError(
                f"Missing column header. I've got {len(values)} "
                f"values: {values} but only {len(keys)} headers: "
                f"({keys}). I don't know which is which now."
            )

        return [
            ParameterFactory.parse_from_key_value(key, val)
            for key, val in row.items()
        ]

    def parameter_types(self):
        """Sorted list of all classes of Parameter found in this list

        Useful if you want to make a nice command_table for example

        Returns
        -------
        List[class]
            Each distinct class of Parameter, ordered in the same order as
            rows.ALL_PARAMETERS

        """
        types = {type(param) for row in self.rows for param in row}
        return [x for x in ALL_PARAMETERS if x in types]

    def to_table_string(self, max_rows: Optional[int] = None):
        """A source - patient_id command_table with a small header

        Returns
        -------
        str:
            Nice input representation of this list, 80 chars wide, truncated if
            needed
        max_rows: Optional[int]
            If given, show at most this many rows of content. If not given,
            prints all

        """
        # remember parameter list can be sparse
        table = defaultdict(list)
        types = [SourceIdentifierParameter, PseudoName]

        if max_rows is None:
            rows = self.rows
        else:
            rows = self.rows[:max_rows]

        for row in rows:
            typed_row = {type(x): x for x in row}
            for param_type in types:
                try:
                    instance = typed_row[param_type]
                except KeyError:
                    instance = param_type()
                table[param_type.field_name].append(instance.value)

        if max_rows is None:
            output = f"Parameter grid with {len(self.rows)} rows:\n\n"
        else:
            output = (
                f"Parameter grid with {len(self.rows)} rows (showing at most "
                f"{max_rows}):\n\n"
            )
        output += tabulate(table, headers="keys", tablefmt="simple")
        return output


class MappingFile:
    """A file that contains a mapping"""

    def __init__(self, file_path: Path):
        self.file_path = file_path

    def save_mapping(self, mapping: Mapping):
        with open(self.file_path, "w", newline="") as f:
            mapping.save_to(f)

    def load_mapping(self) -> Mapping:
        """Load Mapping from default location in this folder

        Returns
        -------
        Mapping

        Raises
        ------
        MappingLoadError
            If mapping cannot be loaded

        """
        with open(self.file_path, newline="") as f:
            try:
                return Mapping.load(f)
            except FileNotFoundError as e:
                raise MappingLoadError("Could not load mapping") from e

    def get_mapping(self) -> Mapping:
        """Load default mapping from this folder

        Returns
        -------
        Mapping
            Loaded from current dir

        Raises
        ------
        MapperError
            When no mapping could be loaded from current directory

        """
        try:
            with open(self.file_path, newline="") as f:
                return Mapping.load(f)
        except (FileNotFoundError, MapperError) as e:
            raise MapperError(
                f"Could not load mapping at '{self.file_path}'"
            ) from e


class ExampleJobParameterGrid(JobParameterGrid):
    """A mapping list with some example content. Gives an overview of possible
    identifiers
    """

    def __init__(self):
        rows = [
            [
                SourceIdentifierParameter(
                    StudyInstanceUIDIdentifier("123.12121212.12345678")
                ),
                PseudoName("PatientA"),
                Description(
                    "A study which should be retrieved from PACS, "
                    "identified by StudyInstanceUID"
                ),
            ],
            [
                SourceIdentifierParameter(
                    AccessionNumberIdentifier("12345678.1234567")
                ),
                PseudoName("PatientB"),
                Description(
                    "A study which should be retrieved from PACS, "
                    "identified by AccessionNumber"
                ),
            ],
        ]

        super().__init__(rows=rows)


class MappingParameterSet(ParameterSet):
    """A set of parameters forming one row in a mapping. Defines defaults and
    restrictions
    """

    def __init__(self, parameters: List[Parameter]):
        """Create a parameter set to put in a mapping. Missing input parameters will
        be added with default values

        Parameters
        ----------
        parameters: List[Parameter]
            The parameters in this set

        Raises
        ------
        MapperError
            If mapping does not contain a source parameter. Without a source this
            is not valid to put in a mapping.

        """
        super().__init__(parameters=self.get_default_parameters())
        self.update(parameters)
        try:
            self.get_source_parameter()
        except ParameterError as e:
            raise MapperError(
                "Invalid set of parameters for mapping: no source found. Where"
                " should the data come from? Original error"
            ) from e

    @staticmethod
    def get_default_parameters() -> ParameterSet:
        """Generate some reasonable defaults for pseudo name and description"""
        return ParameterSet(
            parameters=[
                ParameterFactory.generate_pseudo_name(),
                ParameterFactory.generate_description(),
            ]
        )


def get_local_dialect() -> Dialect:
    """Try to obtain best match for local CSV dialect

    Uses the heuristic that decimal separator comma goes together with
    list separator colon
    """
    if locale.localeconv()["decimal_point"] == ",":
        return ColonDelimited()
    else:
        return csv.excel


class MapperError(AnonAPIError):
    pass


class MappingLoadError(MapperError):
    pass


class ColonDelimited(csv.excel):
    """Excel csv dialect, but with colon ';' delimiter"""

    delimiter = ";"