inasafe/inasafe

View on GitHub
safe/report/extractors/aggregate_postprocessors.py

Summary

Maintainability
F
1 wk
Test Coverage
# coding=utf-8
"""Module used to generate context for aggregation postprocessors sections.
"""

from collections import OrderedDict

# noinspection PyUnresolvedReferences

from safe.common.parameters.resource_parameter import ResourceParameter
from safe.definitions.exposure import exposure_population
from safe.definitions.field_groups import (
    age_displaced_count_group,
    gender_displaced_count_group,
    vulnerability_displaced_count_groups)
from safe.definitions.fields import (
    aggregation_name_field,
    displaced_field,
    additional_minimum_needs)
from safe.definitions.minimum_needs import minimum_needs_fields
from safe.definitions.utilities import (
    postprocessor_output_field,
    definition)
from safe.processors.population_post_processors import (
    age_postprocessors,
    gender_postprocessors)
from safe.report.extractors.util import (
    value_from_field_name,
    resolve_from_dictionary)
from safe.utilities.rounding import format_number

__copyright__ = "Copyright 2016, The InaSAFE Project"
__license__ = "GPL version 3"
__email__ = "info@inasafe.org"
__revision__ = '$Format:%H$'


def aggregation_postprocessors_extractor(impact_report, component_metadata):
    """Extracting aggregate result of demographic.

    :param impact_report: the impact report that acts as a proxy to fetch
        all the data that extractor needed
    :type impact_report: safe.report.impact_report.ImpactReport

    :param component_metadata: the component metadata. Used to obtain
        information about the component we want to render
    :type component_metadata: safe.report.report_metadata.
        ReportComponentsMetadata

    :return: context for rendering phase
    :rtype: dict

    .. versionadded:: 4.0
    """
    context = {
        'sections': OrderedDict()
    }

    """Initializations."""

    extra_args = component_metadata.extra_args
    # Find out aggregation report type
    aggregation_summary = impact_report.aggregation_summary
    analysis_layer = impact_report.analysis
    analysis_layer_fields = impact_report.analysis.keywords['inasafe_fields']
    use_rounding = impact_report.impact_function.use_rounding
    use_aggregation = bool(impact_report.impact_function.provenance[
        'aggregation_layer'])
    provenance = impact_report.impact_function.provenance
    exposure_keywords = provenance['exposure_keywords']

    # Get exposure type definition
    exposure_type = definition(exposure_keywords['exposure'])

    # this entire section is only for population exposure type
    if not exposure_type == exposure_population:
        return context

    # check zero displaced (there will be no output to display)
    try:
        displaced_field_name = analysis_layer_fields[displaced_field['key']]
        total_displaced = value_from_field_name(
            displaced_field_name, analysis_layer)

        zero_displaced = False
        if total_displaced == 0:
            zero_displaced = True
    except KeyError:
        # in case no displaced field
        # let each section handled itself
        zero_displaced = False

    context['component_key'] = component_metadata.key
    context['use_aggregation'] = use_aggregation
    context['header'] = resolve_from_dictionary(
        extra_args, 'header')

    group_header_format = resolve_from_dictionary(
        extra_args, ['defaults', 'group_header_format'])

    section_header_format = resolve_from_dictionary(
        extra_args,
        ['defaults', 'section_header_format'])
    if not use_aggregation:
        section_header_format = resolve_from_dictionary(
            extra_args, ['defaults', 'section_header_format_no_aggregation'])

    """Age Groups."""
    age_items = {
        'group': age_displaced_count_group,
        'group_header': group_header_format.format(
            header_name=age_displaced_count_group['header_name']),
        'fields': [postprocessor_output_field(p) for p in age_postprocessors]
    }

    # check age_fields exists
    for field in age_items['fields']:
        if field['key'] in analysis_layer_fields:
            no_age_field = False
            break
    else:
        no_age_field = True

    context['sections']['age'] = []
    age_section_header = section_header_format.format(
        header_name=age_displaced_count_group['header_name'])
    if zero_displaced:
        context['sections']['age'].append(
            {
                'header': age_section_header,
                'empty': True,
                'message': resolve_from_dictionary(
                    extra_args, ['defaults', 'zero_displaced_message'])
            }
        )
    elif no_age_field:
        context['sections']['age'].append(
            {
                'header': age_section_header,
                'empty': True,
                'message': resolve_from_dictionary(
                    extra_args, ['defaults', 'no_age_rate_message'])
            }
        )
    else:
        context['sections']['age'].append(
            create_section(
                aggregation_summary,
                analysis_layer,
                age_items,
                age_section_header,
                use_aggregation=use_aggregation,
                use_rounding=use_rounding,
                extra_component_args=extra_args)
        )

    """Gender Groups."""
    gender_items = {
        'group': gender_displaced_count_group,
        'group_header': group_header_format.format(
            header_name=gender_displaced_count_group['header_name']),
        'fields': [
            postprocessor_output_field(p) for p in gender_postprocessors]
    }

    # check gender_fields exists
    for field in gender_items['fields']:
        if field['key'] in analysis_layer_fields:
            no_gender_field = False
            break
    else:
        no_gender_field = True

    context['sections']['gender'] = []
    gender_section_header = section_header_format.format(
        header_name=gender_displaced_count_group['header_name'])
    if zero_displaced:
        context['sections']['gender'].append(
            {
                'header': gender_section_header,
                'empty': True,
                'message': resolve_from_dictionary(
                    extra_args, ['defaults', 'zero_displaced_message'])
            }
        )
    elif no_gender_field:
        context['sections']['gender'].append(
            {
                'header': gender_section_header,
                'empty': True,
                'message': resolve_from_dictionary(
                    extra_args, ['defaults', 'no_gender_rate_message'])
            }
        )
    else:
        context['sections']['gender'].append(
            create_section(
                aggregation_summary,
                analysis_layer,
                gender_items,
                gender_section_header,
                use_aggregation=use_aggregation,
                use_rounding=use_rounding,
                extra_component_args=extra_args)
        )

    """Vulnerability Groups."""
    context['sections']['vulnerability'] = []
    for vulnerability_group in vulnerability_displaced_count_groups:
        vulnerability_items = {
            'group': vulnerability_group,
            'group_header': group_header_format.format(
                header_name=vulnerability_group['header_name']),
            'fields': [field for field in vulnerability_group['fields']]
        }

        # check vulnerability_fields exists
        for field in vulnerability_items['fields']:
            if field['key'] in analysis_layer_fields:
                no_vulnerability_field = False
                break
        else:
            no_vulnerability_field = True

        vulnerability_section_header = section_header_format.format(
            header_name=vulnerability_group['header_name'])
        if zero_displaced:
            context['sections']['vulnerability'].append(
                {
                    'header': vulnerability_section_header,
                    'empty': True,
                    'message': resolve_from_dictionary(
                        extra_args, ['defaults', 'zero_displaced_message'])
                }
            )
        elif no_vulnerability_field:
            context['sections']['vulnerability'].append(
                {
                    'header': vulnerability_section_header,
                    'empty': True,
                    'message': resolve_from_dictionary(
                        extra_args,
                        ['defaults', 'no_vulnerability_rate_message'])
                }
            )
        else:
            context['sections']['vulnerability'].append(
                create_section(
                    aggregation_summary,
                    analysis_layer,
                    vulnerability_items,
                    vulnerability_section_header,
                    use_aggregation=use_aggregation,
                    use_rounding=use_rounding,
                    extra_component_args=extra_args)
            )

    """Minimum Needs."""
    context['sections']['minimum_needs'] = []
    minimum_needs_section_header = resolve_from_dictionary(
        extra_args, ['sections', 'minimum_needs', 'header'])
    # Don't show minimum needs if there is no displaced
    if zero_displaced:
        context['sections']['minimum_needs'].append(
            {
                'header': minimum_needs_section_header,
                'empty': True,
                'message': resolve_from_dictionary(
                    extra_args, ['defaults', 'zero_displaced_message'])
            }
        )
    # Only provides minimum needs breakdown if there is aggregation layer
    elif use_aggregation:
        # minimum needs should provide unit for column headers
        units_label = []
        minimum_needs_items = {
            'group_header': 'Minimum needs breakdown',
            'fields': minimum_needs_fields + additional_minimum_needs
        }

        for field in minimum_needs_items['fields']:
            unit = None
            if field.get('need_parameter'):
                need = field['need_parameter']
                if isinstance(need, ResourceParameter):
                    unit_abbreviation = need.unit.abbreviation
            elif field.get('unit'):
                need_unit = field.get('unit')
                unit_abbreviation = need_unit.get('abbreviation')

            if unit_abbreviation:
                unit_format = '{unit}'
                unit = unit_format.format(
                    unit=unit_abbreviation)
            units_label.append(unit)

        context['sections']['minimum_needs'].append(
            create_section(
                aggregation_summary,
                analysis_layer,
                minimum_needs_items,
                minimum_needs_section_header,
                units_label=units_label,
                use_rounding=use_rounding,
                extra_component_args=extra_args)
        )
    else:
        sections_not_empty = True
        for _, values in list(context['sections'].items()):
            for value in values:
                if value.get('rows'):
                    break
                else:
                    sections_not_empty = False

        context['sections_not_empty'] = sections_not_empty

    return context


def create_section(
        aggregation_summary, analysis_layer, postprocessor_fields,
        section_header,
        use_aggregation=True,
        units_label=None,
        use_rounding=True,
        extra_component_args=None):
    """Create demographic section context.

    :param aggregation_summary: Aggregation summary
    :type aggregation_summary: qgis.core.QgsVectorlayer

    :param analysis_layer: Analysis layer
    :type analysis_layer: qgis.core.QgsVectorLayer

    :param postprocessor_fields: Postprocessor fields to extract
    :type postprocessor_fields: list[dict]

    :param section_header: Section header text
    :type section_header: qgis.core.QgsVectorLayer

    :param use_aggregation: Flag, if using aggregation layer
    :type use_aggregation: bool

    :param units_label: Unit label for each column
    :type units_label: list[str]

    :param use_rounding: flag for rounding, affect number representations
    :type use_rounding: bool

    :param extra_component_args: extra_args passed from report component
        metadata
    :type extra_component_args: dict

    :return: context for gender section
    :rtype: dict

    .. versionadded:: 4.0
    """
    if use_aggregation:
        return create_section_with_aggregation(
            aggregation_summary, analysis_layer, postprocessor_fields,
            section_header,
            units_label=units_label,
            use_rounding=use_rounding,
            extra_component_args=extra_component_args)
    else:
        return create_section_without_aggregation(
            aggregation_summary, analysis_layer, postprocessor_fields,
            section_header,
            units_label=units_label,
            use_rounding=use_rounding,
            extra_component_args=extra_component_args)


def create_section_with_aggregation(
        aggregation_summary, analysis_layer, postprocessor_fields,
        section_header,
        units_label=None,
        use_rounding=True,
        extra_component_args=None):
    """Create demographic section context with aggregation breakdown.

    :param aggregation_summary: Aggregation summary
    :type aggregation_summary: qgis.core.QgsVectorlayer

    :param analysis_layer: Analysis layer
    :type analysis_layer: qgis.core.QgsVectorLayer

    :param postprocessor_fields: Postprocessor fields to extract
    :type postprocessor_fields: list[dict]

    :param section_header: Section header text
    :type section_header: qgis.core.QgsVectorLayer

    :param units_label: Unit label for each column
    :type units_label: list[str]

    :param use_rounding: flag for rounding, affect number representations
    :type use_rounding: bool

    :param extra_component_args: extra_args passed from report component
        metadata
    :type extra_component_args: dict

    :return: context for gender section
    :rtype: dict

    .. versionadded:: 4.0
    """
    aggregation_summary_fields = aggregation_summary.keywords[
        'inasafe_fields']
    analysis_layer_fields = analysis_layer.keywords[
        'inasafe_fields']

    # retrieving postprocessor
    postprocessors_fields_found = []

    if isinstance(postprocessor_fields, dict):
        output_fields = postprocessor_fields['fields']
    else:
        output_fields = postprocessor_fields

    for output_field in output_fields:
        if output_field['key'] in aggregation_summary_fields:
            postprocessors_fields_found.append(output_field)

    if not postprocessors_fields_found:
        return {}

    # figuring out displaced field
    # no displaced field, can't show result
    if displaced_field['key'] not in analysis_layer_fields:
        return {}
    if displaced_field['key'] not in aggregation_summary_fields:
        return {}

    """Generating header name for columns."""

    # First column header is aggregation title
    default_aggregation_header = resolve_from_dictionary(
        extra_component_args, ['defaults', 'aggregation_header'])
    total_population_header = resolve_from_dictionary(
        extra_component_args, ['defaults', 'total_population_header'])
    columns = [
        aggregation_summary.title() or default_aggregation_header,
        total_population_header,
    ]
    row_values = []

    group_fields_found = []
    start_group_header = True
    for idx, output_field in enumerate(postprocessors_fields_found):

        name = output_field.get('header_name')
        if not name:
            name = output_field.get('name')

        if units_label or output_field.get('unit'):
            unit = None
            if units_label:
                unit = units_label[idx]
            elif output_field.get('unit'):
                unit = output_field.get('unit').get('abbreviation')

            if unit:
                header_format = '{name} [{unit}]'
            else:
                header_format = '{name}'

            header = header_format.format(
                name=name,
                unit=unit)
        else:
            header_format = '{name}'
            header = header_format.format(name=name)

        if isinstance(postprocessor_fields, dict):
            try:
                group_header = postprocessor_fields['group_header']
                group_fields = postprocessor_fields['group']['fields']
                if output_field in group_fields:
                    group_fields_found.append(output_field)
                else:
                    columns.append(header)
                    continue
            except KeyError:
                group_fields_found.append(output_field)

        header_dict = {
            'name': header,
            'group_header': group_header,
            'start_group_header': start_group_header
        }

        start_group_header = False
        columns.append(header_dict)

    """Generating values for rows."""

    for feature in aggregation_summary.getFeatures():

        aggregation_name_index = aggregation_summary.fields().lookupField(
            aggregation_name_field['field_name'])
        displaced_field_name = aggregation_summary_fields[
            displaced_field['key']]
        displaced_field_index = aggregation_summary.fields().lookupField(
            displaced_field_name)

        aggregation_name = feature[aggregation_name_index]
        total_displaced = feature[displaced_field_index]

        if not total_displaced or total_displaced is None:
            # skip if total displaced null
            continue

        total_displaced = format_number(
            feature[displaced_field_index],
            use_rounding=use_rounding,
            is_population=True)

        row = [
            aggregation_name,
            total_displaced,
        ]

        if total_displaced == '0' and not use_rounding:
            continue

        for output_field in postprocessors_fields_found:
            field_name = aggregation_summary_fields[output_field['key']]
            field_index = aggregation_summary.fields().lookupField(field_name)
            value = feature[field_index]

            value = format_number(
                value,
                use_rounding=use_rounding,
                is_population=True)
            row.append(value)

        row_values.append(row)

    """Generating total rows."""

    total_displaced_field_name = analysis_layer_fields[
        displaced_field['key']]
    value = value_from_field_name(
        total_displaced_field_name, analysis_layer)
    value = format_number(
        value,
        use_rounding=use_rounding,
        is_population=True)
    total_header = resolve_from_dictionary(
        extra_component_args, ['defaults', 'total_header'])
    totals = [
        total_header,
        value
    ]
    for output_field in postprocessors_fields_found:
        field_name = analysis_layer_fields[output_field['key']]
        value = value_from_field_name(field_name, analysis_layer)
        value = format_number(
            value,
            use_rounding=use_rounding,
            is_population=True)
        totals.append(value)

    default_notes = resolve_from_dictionary(
        extra_component_args, ['defaults', 'notes'])

    if not isinstance(default_notes, list):
        default_notes = [default_notes]

    try:
        notes = default_notes + postprocessor_fields['group']['notes']
    except (TypeError, KeyError):
        notes = default_notes
        pass

    return {
        'notes': notes,
        'header': section_header,
        'columns': columns,
        'rows': row_values,
        'totals': totals,
        'group_header_colspan': len(group_fields_found)
    }


def create_section_without_aggregation(
        aggregation_summary, analysis_layer, postprocessor_fields,
        section_header,
        units_label=None,
        use_rounding=True,
        extra_component_args=None):
    """Create demographic section context without aggregation.

    :param aggregation_summary: Aggregation summary
    :type aggregation_summary: qgis.core.QgsVectorlayer

    :param analysis_layer: Analysis layer
    :type analysis_layer: qgis.core.QgsVectorLayer

    :param postprocessor_fields: Postprocessor fields to extract
    :type postprocessor_fields: list[dict]

    :param section_header: Section header text
    :type section_header: qgis.core.QgsVectorLayer

    :param units_label: Unit label for each column
    :type units_label: list[str]

    :param use_rounding: flag for rounding, affect number representations
    :type use_rounding: bool

    :param extra_component_args: extra_args passed from report component
        metadata
    :type extra_component_args: dict

    :return: context for gender section
    :rtype: dict
    """
    aggregation_summary_fields = aggregation_summary.keywords[
        'inasafe_fields']
    analysis_layer_fields = analysis_layer.keywords[
        'inasafe_fields']

    # retrieving postprocessor
    postprocessors_fields_found = []
    for output_field in postprocessor_fields['fields']:
        if output_field['key'] in aggregation_summary_fields:
            postprocessors_fields_found.append(output_field)

    if not postprocessors_fields_found:
        return {}

    # figuring out displaced field
    try:
        analysis_layer_fields[displaced_field['key']]
        aggregation_summary_fields[displaced_field['key']]
    except KeyError:
        # no displaced field, can't show result
        return {}

    """Generating header name for columns."""

    # First column header is aggregation title
    total_population_header = resolve_from_dictionary(
        extra_component_args, ['defaults', 'total_population_header'])
    columns = [
        section_header,
        total_population_header,
    ]

    """Generating values for rows."""
    row_values = []

    for idx, output_field in enumerate(postprocessors_fields_found):

        name = output_field.get('header_name')
        if not name:
            name = output_field.get('name')

        row = []
        if units_label or output_field.get('unit'):
            unit = None
            if units_label:
                unit = units_label[idx]
            elif output_field.get('unit'):
                unit = output_field.get('unit').get('abbreviation')

            if unit:
                header_format = '{name} [{unit}]'
            else:
                header_format = '{name}'

            header = header_format.format(
                name=name,
                unit=unit)
        else:
            header_format = '{name}'
            header = header_format.format(name=name)

        row.append(header)

        # if no aggregation layer, then aggregation summary only contain one
        # feature
        field_name = analysis_layer_fields[output_field['key']]
        value = value_from_field_name(
            field_name,
            analysis_layer)
        value = format_number(
            value,
            use_rounding=use_rounding,
            is_population=True)
        row.append(value)

        row_values.append(row)

    return {
        'columns': columns,
        'rows': row_values,
    }