mangroveorg/datawinners

View on GitHub
datawinners/entity/geo_data.py

Summary

Maintainability
D
1 day
Test Coverage
import json
from collections import OrderedDict

from coverage.html import escape

from mangrove.datastore.entity import get_all_entities
from mangrove.errors.MangroveException import DataObjectNotFound
from mangrove.form_model.field import UniqueIdUIField, field_attributes
from datawinners.utils import is_json



def geo_jsons(manager, entity_type, filters, details, specials, map_view = False, total_in_label = False):
    entity_fields = manager.view.registration_form_model_by_entity_type(key=[entity_type], include_docs=True)[0]["doc"]["json_fields"]

    geo_jsons = [{
        "name": entity_type.capitalize(),
        "data": _geo_json(manager, entity_type, entity_fields, dict(filters), details),
        "color": "rgb(104, 174, 59)"
    }]

    for special in specials:
        field = [field for field in entity_fields if field['code'] == special][0]
        group = {"group": field['label'], "data": []}
        total_number = 0
        for choice in specials[special]:
            filters_with_special = dict(filters)
            filters_with_special.update({special: choice['value']})
            is_geojson_for_special_required = True
            if special in filters.keys() and choice['value'] not in dict(filters).get(special):
                is_geojson_for_special_required = False
            matched_choices = [c['text'] for c in field['choices'] if c['val'] == choice['value']]
            if matched_choices:
                data = _geo_json(manager, entity_type, entity_fields, filters_with_special,
                                 details) if is_geojson_for_special_required else {'features': [],
                                                                                'type': 'FeatureCollection'}
                label_legend = matched_choices[0]
                if map_view and total_in_label:
                    label_legend += " (" + str(len(data['features'])) + ")"
                if (map_view and len(data['features'])>0) or not map_view:
                    group["data"].append({
                        "name": label_legend ,
                        "data": data,
                        "color": choice['color']

                    })
                total_number += len(data['features'])
        if map_view and total_in_label:
            group["group"] += " Total " + str(total_number)

            
        geo_jsons.append(group)

    return json.dumps(geo_jsons)


def get_first_geocode_field_for_entity_type(entity_all_fields):
    geocode_fields = [f for f in
                      entity_all_fields if
                      f["type"] == "geocode"]
    return geocode_fields[0] if len(geocode_fields) > 0 else None


def get_location_list_for_entities(first_geocode_field, unique_ids):
    location_list = []
    for entity in unique_ids:
        value_dict = entity.data.get(first_geocode_field["name"])
        if value_dict and value_dict.has_key('value'):
            value = value_dict["value"]
            location_list.append(_to_json_point(value))
    return location_list


def get_location_list_for_datasenders(datasenders):
    location_list = []
    for entity in datasenders:
        geocode = entity.geometry
        if geocode:
            value = (geocode["coordinates"][0], geocode["coordinates"][1])
            location_list.append(_to_json_point(value))
    return location_list


def _geo_json(dbm, entity_type, entity_fields, filters, details):
    location_list = []

    try:
        forward_filters, reverse_filters = _transform_filters(filters, entity_fields)
        first_geocode_field = get_first_geocode_field_for_entity_type(entity_fields)
        if first_geocode_field:
            unique_ids = get_all_entities(
                dbm, [entity_type], 1000, forward_filters, reverse_filters
            )
            details.extend(['q2'])
            fields_to_show = filter(lambda field: field['code'] in details, entity_fields)
            details_to_show_combined = [json.loads(d.replace("'", '"')) for d in details if is_json(d.replace("'", '"'))]
            field_label_combined = None
            if len(details_to_show_combined):
                field_label_combined = []
                for dtsc in details_to_show_combined:
                    field_label_combined_simple = {}
                    field_label_combined_simple['list'] = filter(lambda field: field['code'] in dtsc['list'], entity_fields)
                    field_label_combined_simple['list'] = _get_field_labels(field_label_combined_simple['list'])
                    field_label_combined_simple['label'] = dtsc['label']
                    field_label_combined.append(field_label_combined_simple)
            field_label = _get_field_labels(fields_to_show)

            lst_for_entity = _get_detail_list_for_entities(
                field_label,
                first_geocode_field,
                unique_ids,
                field_label_combined
            )
            location_list.extend(lst_for_entity)

    except DataObjectNotFound:
        pass

    return {"type": "FeatureCollection", "features": location_list}


def _transform_filters(filters, entity_all_fields):
    d = dict((field['code'], field) for field in entity_all_fields)
    forward_filters = {}
    reverse_filters = {}
    for f in filters:
        if len(f.split(",")) > 1 or d[f]["type"] == field_attributes.UNIQUE_ID_FIELD or d[f]["type"] == field_attributes.TEXT_FIELD:
            if "" not in filters[f]:
                if len(f.split(",")) > 1:
                    reverse_filters[filters[f][0]] = [d[qn]['name'] for qn in f.split(",")]
                else:
                    forward_filters[d[f]['name']] = filters[f][0]
        
        else:
            forward_filters[d[f]['name']] = \
                [choice['text'] for choice in d[f]['choices'] if choice['val'] in filters[f]]
    return forward_filters, reverse_filters


def _get_entity_options(dbm, entity_type):
    return [(entity.short_code, escape(entity.data['name']['value'])) for entity in get_all_entities(dbm, [entity_type])]


def _get_field_labels(entity_fields):
    dict_simplified = OrderedDict()
    for field in entity_fields :
        dict_simplified[field['name']] = field['label']
    return dict_simplified


def _get_detail_list_for_entities(entity_field_labels, first_geocode_field, unique_ids, field_label_combined = None):
    detail_list = []
    for entity in unique_ids:
        value_dict = entity.data.get(first_geocode_field["name"])
        if value_dict and value_dict.has_key('value'):
            value = value_dict["value"]
            detail_list.append(_to_json_detail(value, entity_field_labels, entity.data, entity.type_string, field_label_combined))
    return detail_list


def _to_json_detail(value, entity_field_labels, data=None, entity_type=None, fields_to_show_combined=None):
    detail_json = _to_json_point(value)
    detail_json['properties'] = _simplify_field_data(data, entity_field_labels, entity_type)
    if fields_to_show_combined != None:
        for ftsc in fields_to_show_combined:
            combined_value = _simplify_field_data(data, ftsc['list'], entity_type)
            value_string = ''
            for label,details  in combined_value.items():
                if label != 'entity_type':
                    if len(value_string)>0:
                        value_string = value_string + ', '
                    value_string = value_string + details['value']
            # detail_json["properties"].update({"address", {'value':value_string, 'label':'Address'}})
            detail_json["properties"].update({ftsc['label']: {'value':value_string, 'label':ftsc['label']}})
            #combined_value_dict = {}
            #combined_value_dict[ftsc['label']]['value'] = value_string
            #combined_value_dict[ftsc['label']]['label'] = ftsc['label']
            #detail_json['properties'].append(combined_value_dict)


    return detail_json


def _to_json_point(value):
    point_json = { "type": "Feature", "geometry":
        {
            "type": "Point",
            "coordinates": [
                value[1],
                value[0]
            ]
        }
    }
    return point_json


def _simplify_field_data(data, entity_field_labels, entity_type=None):
    simple_data = OrderedDict()

    if entity_type is not None:
        simple_data['entity_type'] = {}
        simple_data['entity_type']["value"] = entity_type
        simple_data['entity_type']["label"] = ""

    entity_details = [(key, data.get(key)) for key in entity_field_labels if key in data.keys()]

    for key, value_field in entity_details:
        one_field_data = {}
        one_field_data["value"]= value_field["value"]

        if key != "entity_type":
            if key == "mobile_number" and entity_type is None:
                one_field_data["label"]= entity_field_labels["telephone_number"]
            else:
                one_field_data["label"]= entity_field_labels[key]
        else:
            one_field_data["label"] = ""

        simple_data[key] = one_field_data

    return simple_data