CIMAC-CIDC/cidc-schemas

View on GitHub
cidc_schemas/template_reader.py

Summary

Maintainability
C
1 day
Test Coverage
A
98%
# -*- coding: utf-8 -*-

"""Defines the `XlTemplateReader` class for reading/validating templates from Excel templates."""

import logging
from itertools import dropwhile, zip_longest
from typing import Dict, List, Tuple, Union, BinaryIO, NamedTuple, Optional
from warnings import filterwarnings

filterwarnings(
    action="ignore",
    category=UserWarning,
    message="Data Validation extension is not supported",
    module="openpyxl",
)

import openpyxl

from .template import Template
from .template_writer import RowType, row_type_from_string
from .json_validation import validate_instance

logger = logging.getLogger("cidc_schemas.template_reader")


# A template row is any tuple whose first member is a RowType
class TemplateRow(NamedTuple):
    row_num: int
    row_type: RowType
    values: Union[List, Tuple]


# A mapping from RowType to a list of rows with that type
RowGroup = Dict[RowType, list]


class ValidationError(Exception):
    pass


class XlTemplateReader:
    """
    Reader and validator for Excel templates.
    """

    def __init__(self, template: Dict[str, List[TemplateRow]]):
        """
        Initialize a template reader.

        Arguments:
            template {Dict[str, List[TemplateRow]]} -- a mapping from worksheet names to worksheet rows
        """
        self.template = template

        # Mapping from worksheet names to rows grouped by type
        self.grouped_rows: Dict[str, RowGroup] = self._group_worksheet_rows()

        self.visited_fields = set()

    @staticmethod
    def _clean_value_row(values):
        rev_values = values[::-1]
        clean = list(dropwhile(lambda v: v is None, rev_values))
        clean = [v.strip() if isinstance(v, str) else v for v in clean]
        return clean[::-1]

    @staticmethod
    def from_excel(xlsx_path: Union[str, BinaryIO]):
        """
        Initialize an Excel template reader from an excel file.

        Arguments:
          xlsx_path {Union[str, BinaryIO]} -- path to the Excel file or the open file itself.

        Returns:
            arg1: XlTemplateReader or None if errors
            arg2: list of errors
        """

        # Load the Excel file
        workbook = openpyxl.load_workbook(xlsx_path)

        template = {}
        errors = []
        for worksheet_name in workbook.sheetnames:
            if worksheet_name in Template.ignored_worksheets:
                continue

            worksheet = workbook[worksheet_name]
            rows = []
            header_width = 0
            for row_num, row in enumerate(worksheet.iter_rows(), start=1):
                # Convert to string and extract type annotation
                typ, *values = [col.value for col in row]
                row_type = row_type_from_string(typ)

                # If entire row is empty, skip it (this happens at the bottom of the data table, e.g.)
                if not any(values):
                    continue

                # If no recognized row type is found but the row has data, throw an error
                if not row_type:
                    raise ValidationError(
                        f"No recognized row type found in row {worksheet_name}/{row_num}.\n"
                        f"Add #skip to column A to skip this row.\n"
                        f"Add #title to column A if this is a preamble row.\n"
                        f"Add #preamble to column A if this is a preamble row.\n"
                        f"Add #header to column A if this is a data header row.\n"
                        f"Add #data to column A if this is a data row."
                    )

                # Filter empty cells from the end of the row
                if row_type == RowType.HEADER:
                    values = XlTemplateReader._clean_value_row(values)
                    header_width = len(values)

                # Filter empty cells from the end of a data row
                elif row_type == RowType.DATA:
                    if not header_width:
                        errors.append(
                            f"Encountered data row (#{row_num} in worksheet {worksheet_name!r}) before header row"
                        )

                    values = XlTemplateReader._clean_value_row(values)
                    if len(values) > header_width:
                        errors.append(
                            f"Encountered data row (#{row_num} in worksheet {worksheet_name!r}) wider than header row"
                        )

                elif row_type == RowType.PREAMBLE:
                    values = XlTemplateReader._clean_value_row(values)

                    # preamble rows are [key, value], therefore must have exactly two entries
                    # if the value is optional and missing (ie None), will only get [key] after _clean_value_row, so add back None
                    if len(values) == 1:
                        values.append(None)
                    elif len(values) != 2:
                        errors.append(
                            f"Encountered preamble row in worksheet with width {len(values)} (expected 2)"
                        )

                # Reassemble parsed row and add to rows
                new_row = TemplateRow(row_num, row_type, values)
                rows.append(new_row)
            template[worksheet_name] = rows

        return XlTemplateReader(template), errors

    def _group_worksheet_rows(self) -> Dict[str, RowGroup]:
        """Map worksheet names to rows grouped by row type"""
        grouped_rows = {}
        for name, rows in self.template.items():
            grouped_rows[name] = self._group_rows(rows)
        return grouped_rows

    @staticmethod
    def _group_rows(rows) -> RowGroup:
        """Group rows in a worksheet by their type annotation"""
        # Initialize mapping from row types to lists of row content
        row_groups: Dict[RowType, List] = {row_type: [] for row_type in RowType}

        for row in rows:
            row_groups[row.row_type].append(row)

        return row_groups

    def _get_schema(
        self, key: str, schema: Dict[str, dict]
    ) -> (Optional[dict], Optional[Exception]):
        """Try to find a schemas for the given template key"""
        entity_name = Template._process_fieldname(key)
        if entity_name not in schema:
            return None, f"Found unexpected column {entity_name!r}"
        # Add a note saying this field was accessed
        self.visited_fields.add(entity_name)
        return schema[entity_name], None

    def _get_data_schemas(
        self, row_groups, data_schemas: Dict[str, dict], ignore_unexpected: bool
    ) -> (List[dict], List[str]):
        """Transform data table into a list of entity name + schema pairs"""
        header_row = row_groups[RowType.HEADER][0]
        data_rows = row_groups[RowType.DATA]

        errors = []

        # Ensure every data row has the right number of entries
        n_columns = len(header_row.values)
        for data_row in data_rows:
            n_entries = len(XlTemplateReader._clean_value_row(data_row.values))
            if n_entries > n_columns:
                errors.append(
                    f"Row {data_row.row_num} has {n_entries - n_columns} unexpected extra values."
                )

        schemas = []
        for header in header_row.values:
            if header:
                sch, err = self._get_schema(header, data_schemas)
                if err and not ignore_unexpected:
                    errors.append(err)
                schemas.append(sch)

        return schemas, errors

    def iter_errors(self, template: Template) -> List[str]:
        for name, schema in template.worksheets.items():
            yield from self._validate_worksheet(name, schema)

    def validate(self, template: Template) -> bool:
        """
        Validate a populated Excel template against a template schema.

        Arguments:
            template {Template} -- a template object containing the expected structure of the template
        Raises:
            ValidationError -- if the .xlsx file is invalid.
        Returns:
            True -- if everything is valid, otherwise raises an exception with validation reporting
        """

        invalid_messages = list(self.iter_errors(template))
        if invalid_messages:
            feedback = "\n".join(map(str, invalid_messages))
            raise ValidationError("\n" + feedback)

        return True

    def _make_validation_error(
        self, worksheet_name: str, field_name: str, row_num: int, message: str
    ) -> str:
        return f"Error in worksheet {worksheet_name!r}, row {row_num}, field {field_name!r}: {message}"

    def _validate_instance(self, value, schema):
        # All fields in an Excel template are required
        # except for "allow_empty"
        return validate_instance(
            value, schema, is_required=(not schema.get("allow_empty"))
        )

    def _validate_worksheet(self, worksheet_name: str, ws_schema: dict) -> List[str]:
        """Validate rows in a worksheet, returning a list of validation error messages."""
        self.visited_fields.clear()

        # If no worksheet is found, return only that error.
        if not worksheet_name in self.grouped_rows:
            yield f"Expected worksheet {worksheet_name!r} not found"
            return
        row_groups = self.grouped_rows[worksheet_name]

        if "preamble_rows" in ws_schema:
            # Validate preamble rows
            preamble_schemas = ws_schema["preamble_rows"]
            for row in row_groups[RowType.PREAMBLE]:
                key, value = row.values[0], row.values[1]
                if isinstance(key, str):
                    key = key.strip()
                if isinstance(value, str):
                    value = value.strip()

                schema, error = self._get_schema(key, preamble_schemas)
                if error:
                    yield self._make_validation_error(
                        worksheet_name, key, row.row_num, error
                    )
                    continue
                invalid_reason = self._validate_instance(value, schema)

                if invalid_reason:
                    yield self._make_validation_error(
                        worksheet_name, key, row.row_num, invalid_reason
                    )

            # Ensure that all preamble rows are present
            for name in preamble_schemas.keys():
                if name.strip() not in self.visited_fields:
                    yield (
                        f"Worksheet {worksheet_name!r} is missing expected template row: {name!r}"
                    )

        # Clear visited fields in case a data column has the same header as a preamble row
        self.visited_fields.clear()

        if "data_columns" in ws_schema:
            # Build up flat mapping of data schemas
            flat_data_schemas: Dict[str, dict] = {}
            for section in ws_schema["data_columns"].values():
                flat_data_schemas = {**flat_data_schemas, **section}

            # Validate data rows
            n_headers = len(row_groups[RowType.HEADER])
            if not n_headers == 1:
                yield (
                    f"Worksheet {worksheet_name!r}: Exactly one header row expected, but found {n_headers}"
                )
                return

            headers = row_groups[RowType.HEADER][0].values
            if not all(headers):
                yield (
                    f"Worksheet {worksheet_name!r}: Found an empty header cell at index {headers.index(None)}"
                )
                return
            headers = [h.strip() if isinstance(h, str) else h for h in headers]

            ignore_unexpected = bool(
                ws_schema.get("prism_arbitrary_data_merge_pointer")
            )
            data_schemas, errors = self._get_data_schemas(
                row_groups, flat_data_schemas, ignore_unexpected
            )
            for e in errors:
                yield (f"Worksheet {worksheet_name!r}: {e}")

            # Ensure that all data columns appear to be present
            for name in flat_data_schemas.keys():
                if name not in self.visited_fields:
                    yield (
                        f"Worksheet {worksheet_name!r} is missing expected template column: {name!r}"
                    )

            for data_row in row_groups[RowType.DATA]:
                for head, value, schema in zip_longest(
                    headers, data_row.values, data_schemas
                ):
                    if schema is None:
                        # we don't check unexpected column
                        # and we already have an error reported on that
                        continue
                    if isinstance(value, str):
                        value = value.strip()

                    invalid_reason = self._validate_instance(value, schema)

                    if invalid_reason:
                        yield self._make_validation_error(
                            worksheet_name, head, data_row.row_num, invalid_reason
                        )