mangroveorg/datawinners

View on GitHub
datawinners/search/submission_index.py

Summary

Maintainability
F
1 wk
Test Coverage
import json
import logging
from collections import OrderedDict
from types import NoneType

from babel.dates import format_datetime
import elasticutils
from pyelasticsearch.exceptions import ElasticHttpError, ElasticHttpNotFoundError

from datawinners.project.couch_view_helper import get_all_projects
from datawinners.project.views.utils import is_original_question_changed_from_choice_answer_type, \
    convert_choice_options_to_options_text
from datawinners.settings import ELASTIC_SEARCH_URL, ELASTIC_SEARCH_TIMEOUT
from mangrove.datastore.documents import ProjectDocument
from datawinners.search.submission_index_meta_fields import submission_meta_fields
from mangrove.form_model.field import DateField, UniqueIdField, SelectField, FieldSet, SelectOneExternalField
from datawinners.search.submission_index_constants import SubmissionIndexConstants
from datawinners.search.submission_index_helper import SubmissionIndexUpdateHandler
from mangrove.errors.MangroveException import DataObjectNotFound
from datawinners.search.index_utils import get_elasticsearch_handle, get_field_definition, _add_date_field_mapping, \
    es_unique_id_code_field_name, \
    es_questionnaire_field_name, _add_text_field_mapping, es_unique_id_details_field_name,\
    lookup_entity, get_field_definition_with_binary_type, _add_binary_field_mapping
from mangrove.datastore.entity import get_by_short_code_include_voided, Entity, Contact
from mangrove.form_model.form_model import FormModel, get_form_model_by_entity_type
from mangrove.form_model.project import Project
from mangrove.form_model.field import PhotoField, AudioField, VideoField

logger = logging.getLogger("datawinners")
UNKNOWN = "N/A"
ES_NUMBER_OF_TYPES_SUPPORTED = 115


def get_code_from_es_field_name(es_field_name, form_model_id):
    for item in es_field_name.split('_'):
        if item != 'value' and item != form_model_id:
            return item


def create_submission_mapping(dbm, latest_form_model, old_form_model):
    SubmissionSearchStore(dbm, latest_form_model, old_form_model).update_store()


class SubmissionSearchStore():
    def __init__(self, dbm, latest_form_model, old_form_model):
        self.dbm = dbm
        self.es = get_elasticsearch_handle()
        self.latest_form_model = latest_form_model
        self.old_form_model = old_form_model

    def update_store(self):
        mapping = self.get_mappings()
        try:
            mapping_old = self.get_old_mappings()
            if mapping_old:
                self._verify_change_involving_date_field(mapping, mapping_old)
                self._verify_unique_id_change()
            if not mapping_old or self._has_fields_changed(mapping, mapping_old):
                self.es.put_mapping(self.dbm.database_name, self.latest_form_model.id, mapping)

        except (ElasticHttpError, FieldTypeChangeException) as e:
            if isinstance(e, FieldTypeChangeException) or e[1].startswith('MergeMappingException'):
                self.recreate_and_populate_elastic_store()
            else:
                logger.error(e)
        except Exception as e:
            logger.error(e)

    def is_mapping_out_of_sync(self):
        try:
            mapping = self.get_mappings()
            mapping_old = self.get_old_mappings()
            if mapping_old:
                self._verify_change_involving_date_field(mapping, mapping_old)
                self._verify_unique_id_change()
            return not mapping_old or self._has_fields_changed(mapping, mapping_old)
        except Exception as e:
            return True
        
    def _has_fields_changed(self, mapping, mapping_old):
        old_mapping_properties = mapping_old.values()[0].values()[0].values()[0]['properties']
        new_fields_with_format = set(
            [k + f["fields"][k + "_value"]["type"] for k, f in mapping.values()[0]['properties'].items() if f.get('fields')])
        old_fields_with_format = set(
            [k + f["fields"][k + "_value"]["type"] for k, f in old_mapping_properties.items() if
             f.get("fields")])

        return new_fields_with_format != old_fields_with_format

    def _verify_change_involving_date_field(self, mapping, mapping_old):
        old_mapping_properties = mapping_old.values()[0].values()[0].values()[0]['properties']
        new_mapping_properties = mapping.values()[0]['properties']
        old_q_codes = old_mapping_properties.keys()
        new_q_codes = new_mapping_properties.keys()
        new_date_fields_with_format = set(
            [k + f["fields"][k + "_value"]["format"] for k, f in new_mapping_properties.items()
             if f.get('fields') and f["fields"][k + "_value"]["type"] == "date" and k in old_q_codes])
        old_date_fields_with_format = set(
            [k + f["fields"][k + "_value"]["format"] for k, f in old_mapping_properties.items()
             if f.get('fields') and f["fields"][k + "_value"]["type"] == "date" and k in new_q_codes])
        date_format_changed = old_date_fields_with_format != new_date_fields_with_format
        if date_format_changed:
            raise FieldTypeChangeException()

    def get_old_mappings(self):
        mapping_old = []
        try:
            mapping_old = self.es.get_mapping(index=self.dbm.database_name, doc_type=self.latest_form_model.id)
        except ElasticHttpNotFoundError as e:
            pass
        return mapping_old

    def get_mappings(self):
        fields_definition = []
        fields_definition.extend(get_submission_meta_fields())
        self._get_submission_fields(fields_definition, self.latest_form_model.fields)
        mapping = self.get_fields_mapping_by_field_def(doc_type=self.latest_form_model.id,
                                                       fields_definition=fields_definition)
        return mapping

    def _get_submission_fields(self, fields_definition, fields, parent_field_name=None):
        for field in fields:
            if isinstance(field, UniqueIdField):
                unique_id_field_name = es_questionnaire_field_name(field.code, self.latest_form_model.id,
                                                                   parent_field_name)
                fields_definition.append(
                    get_field_definition(field, field_name=es_unique_id_code_field_name(unique_id_field_name)))

            if isinstance(field, FieldSet):
                if field.is_group():
                    self._get_submission_fields(fields_definition, field.fields, field.code)
                else:
                    es_field_name = es_questionnaire_field_name(field.code, self.latest_form_model.id,
                                                                            parent_field_name)
                    fields_definition.append(get_field_definition_with_binary_type(field, field_name=es_field_name))
                continue
            fields_definition.append(
                get_field_definition(field,
                                     field_name=es_questionnaire_field_name(field.code, self.latest_form_model.id,
                                                                            parent_field_name)))

    def recreate_elastic_store(self):
        try:
            self.es.send_request('DELETE', [self.dbm.database_name, self.latest_form_model.id, '_mapping'])
        except ElasticHttpNotFoundError as e:
            pass
        self.es.put_mapping(self.dbm.database_name, self.latest_form_model.id, self.get_mappings())

    def recreate_and_populate_elastic_store(self):
        self.recreate_elastic_store()
        from datawinners.search.submission_index_task import async_populate_submission_index

        async_populate_submission_index.delay(self.dbm.database_name, self.latest_form_model.id)

    # def _add_text_field_mapping_for_submission(self, mapping_fields, field_def):
    #     name = field_def["name"]
    #     mapping_fields.update(
    #         {name: {"type": "multi_field", "fields": {
    #             name: {"type": "string"},
    #             name + "_value": {"type": "string", "index_analyzer": "sort_analyzer", "include_in_all": False},
    #             name + "_exact": {"type": "string", "index": "not_analyzed", "include_in_all": False},
    #         }}})

    def get_fields_mapping_by_field_def(self, doc_type, fields_definition):
        """
        fields_definition   = [{"name":form_model_id_q1, "type":"date", "date_format":"MM-yyyy"},{"name":form_model_id_q1, "type":"string"}]
        """
        mapping_fields = {}
        mapping = {"date_detection": False, "properties": mapping_fields}
        methods_dict = {"date": _add_date_field_mapping, "binary": _add_binary_field_mapping}
        
        for field_def in fields_definition:
            method = methods_dict.get(field_def.get("type"), _add_text_field_mapping)
            method(mapping_fields, field_def)

        ds_mapping = self._add_data_sender_details_to_mapping()
        all_id_fields = [field for field in self.latest_form_model.fields if
                         isinstance(field, UniqueIdField)]
        id_field_mapping = self._add_id_field_details_to_mapping(self.latest_form_model.id, all_id_fields)
        mapping_fields.update(ds_mapping)
        if bool(id_field_mapping):
            mapping_fields.update(id_field_mapping)
        return {doc_type: mapping}

    def _verify_unique_id_change(self):
        if self.old_form_model:
            old_unique_id_types = [(field.code, field.unique_id_type) for field in self.old_form_model.fields if
                                   isinstance(field, UniqueIdField)]
            new_unique_id_types = [(field.code, field.unique_id_type) for field in self.latest_form_model.fields if
                                   isinstance(field, UniqueIdField)]
            old_unique_id_types.sort()
            new_unique_id_types.sort()

            if old_unique_id_types != new_unique_id_types:
                raise FieldTypeChangeException()

    def _add_id_field_details_to_mapping(self, questionnaire_id, all_id_fields):
        id_field_mapping = {}
        for id_field in all_id_fields:
            key = es_unique_id_details_field_name(questionnaire_id + '_' + id_field.code)
            unique_field_mapping = {}
            entity_type = get_form_model_by_entity_type(self.dbm, [id_field.unique_id_type])
            if entity_type is not None and not isinstance(entity_type, NoneType):
                for field in entity_type.fields:
                    if field.type is "date":
                        unique_field_mapping.update({field.code: {'type': "date"}})
                    elif field.type is "double":
                        unique_field_mapping.update({field.code: {'type': "double"}})
                    else:
                        unique_field_mapping.update({field.code: {'type': "string", 'index_analyzer': 'sort_analyzer'}})
                id_field_mapping.update({key: {'properties': unique_field_mapping}})

        return id_field_mapping

    def _add_data_sender_details_to_mapping(self):
        return {"datasender": {
            "properties": {
                "email": {
                    "type": "string", 'index_analyzer': 'sort_analyzer'
                },
                "geo_code": {
                    "type": "double"
                },
                "id": {
                    "type": "string", 'index_analyzer': 'sort_analyzer'
                },
                "mobile_number": {
                    "type": "string", 'index_analyzer': 'sort_analyzer'
                },
                "name": {
                    "type": "string", 'index_analyzer': 'sort_analyzer'
                }
            }
        }}


class FieldTypeChangeException(Exception):
    def __str__(self):
        return self.message


def get_submission_meta_fields():
    return submission_meta_fields


def update_submission_search_for_datasender_edition(dbm, short_code, datasender_dict):
    kwargs = {"%s%s" % (SubmissionIndexConstants.DATASENDER_ID_KEY, "_value"): short_code}
    name = datasender_dict['name'] if datasender_dict['name'] else datasender_dict['mobile_number']
    fields_mapping = {SubmissionIndexConstants.DATASENDER_NAME_KEY: name, 'datasender': datasender_dict}
    project_form_model_ids = [project.id for project in get_all_projects(dbm, short_code)]
    chunk, i = ES_NUMBER_OF_TYPES_SUPPORTED, 0
    while i <= len(project_form_model_ids) / chunk:
        process_by_chunk_questionnaire(dbm, project_form_model_ids[i*chunk: (i+1) * chunk - 1], fields_mapping, kwargs)
        i += 1

def process_by_chunk_questionnaire(dbm, project_form_model_ids, fields_mapping, kwargs):
    assert len(project_form_model_ids) < ES_NUMBER_OF_TYPES_SUPPORTED
    query = elasticutils.S().es(urls=ELASTIC_SEARCH_URL, timeout=ELASTIC_SEARCH_TIMEOUT).indexes(
        dbm.database_name).doctypes(*project_form_model_ids)
    query = query[:query.count()].filter(**kwargs)

    for survey_response in query.values_dict('void'):
        SubmissionIndexUpdateHandler(dbm.database_name, survey_response._type).update_field_in_submission_index(
            survey_response._id, fields_mapping)


def _get_submissions_for_unique_id_entry(args, dbm, project):
    query = elasticutils.S().es(urls=ELASTIC_SEARCH_URL, timeout=ELASTIC_SEARCH_TIMEOUT).indexes(
        dbm.database_name).doctypes(project.id)
    query = query[:query.count()].filter(**args)
    return query


def update_submission_search_for_subject_edition(dbm, unique_id_type, subject_details):
    projects = []
    for row in dbm.load_all_rows_in_view('projects_by_subject_type', key=unique_id_type[0], include_docs=True):
        projects.append(Project.new_from_doc(dbm, ProjectDocument.wrap(row['doc'])))
    for project in projects:
        entity_field_code = None
        for field in project.entity_questions:
            if [field.unique_id_type] == unique_id_type:
                entity_field_code = field.code

        if entity_field_code:
            unique_id_field_name = es_questionnaire_field_name(entity_field_code, project.id)

            fields_mapping = {unique_id_field_name: subject_details.get('q2'),
                              unique_id_field_name + '_details': subject_details}
            args = {es_unique_id_code_field_name(unique_id_field_name): subject_details.get('q6')}

            query = _get_submissions_for_unique_id_entry(args, dbm, project)

            for survey_response in query.values_dict('void'):
                SubmissionIndexUpdateHandler(dbm.database_name, project.id).update_field_in_submission_index(
                    survey_response._id, fields_mapping)


def update_submission_search_index(submission_doc, dbm, refresh_index=True, form_model=None, bulk=False):
    es = get_elasticsearch_handle()
    if form_model is None:
        form_model = FormModel.get(dbm, submission_doc.form_model_id)
    search_dict = _meta_fields(submission_doc, dbm)
    _update_with_form_model_fields(dbm, submission_doc, search_dict, form_model)
    if bulk:
        return es.index_op(search_dict, doc_type=form_model.id, index=dbm.database_name, id=submission_doc.id)
    es.index(dbm.database_name, form_model.id, search_dict, id=submission_doc.id, refresh=refresh_index)


def status_message(status):
    return "Success" if status else "Error"


# TODO manage_index
def get_datasender_info(dbm, submission_doc):
    datasender_dict = {}
    if submission_doc.owner_uid:
        datasender_dict = _lookup_contact_by_uid(dbm, submission_doc.owner_uid)
    else:
        datasender_dict['name'] = submission_doc.created_by
        datasender_dict['id'] = UNKNOWN
        datasender_dict['location'] = []
        datasender_dict['email'] = UNKNOWN
        datasender_dict['geo_code'] = []
        datasender_dict['mobile_number'] = submission_doc.created_by

    return datasender_dict


def _meta_fields(submission_doc, dbm):
    search_dict = {}
    datasender_dict = get_datasender_info(dbm, submission_doc)
    search_dict.update({"status": status_message(submission_doc.status)})
    search_dict.update({"date": format_datetime(submission_doc.submitted_on, "MMM. dd, yyyy, hh:mm a", locale="en")})
    search_dict.update({"ds_id": datasender_dict.get('id', '')})
    search_dict.update({"ds_name": datasender_dict.get('name', '')})
    search_dict.update({"datasender": datasender_dict})
    search_dict.update({"error_msg": submission_doc.error_message})
    return search_dict


def _lookup_contact_by_uid(dbm, uid):
    ds_dict = {}
    try:
        if uid:
            contact = Contact.get(dbm, uid)
            mobile_number = contact.value('mobile_number')
            name = contact.value('name')
            ds_dict['mobile_number'] = mobile_number
            ds_dict['name'] = name if name else mobile_number
            ds_dict['email'] = contact.value('email') if contact.value('email') else UNKNOWN
            ds_dict['location'] = contact.value('location') if contact.value('location') else []
            ds_dict['geo_code'] = contact.geometry.get('coordinates') if contact.geometry.get('coordinates') else []
            ds_dict['id'] = contact.short_code
    except Exception:
        pass
    return ds_dict




# TODO:This is required only for the migration for creating submission indexes.To be removed following release10
def _get_select_field_by_revision(field, form_model, submission_doc):
    field_by_revision = form_model.get_field_by_code_and_rev(field.code, submission_doc.form_model_revision)
    return field_by_revision if field_by_revision else field


def _get_options_map(original_field):
    options_map = {}
    for option in original_field.options:
        options_map.update({option['val']: option['text']})
    return options_map


def _get_select_field_answer_from_snapshot(entry, field_for_revision):
    options = field_for_revision.get_options_map()
    value_list = []
    for answer_value in list(entry):
        value_list.append(options[answer_value])
    return ",".join(value_list)


def _fetch_single_select_answer(choices, field):
    if choices:
        choice_text = field.get_value_by_option(choices, default=choices)
    else:
        choice_text = ""

    return choice_text


def _fetch_multi_select_answer(choices, field):
    if choices:
        choice_text = [field.get_value_by_option(option, default=option) for option in choices.split(' ')]
    else:
        choice_text = []

    return choice_text


def _update_choice_value(entry, field):
    choices = entry.get(field.code)
    if field.is_single_select:
        entry[field.code] = _fetch_single_select_answer(choices, field)
    else:
        entry[field.code] = _fetch_multi_select_answer(choices, field)


def group_has_answers(group_answers, group_field):
    if not group_answers:
        return
    return filter(lambda field: group_answers[0].get(field.code), group_field.fields)


def _update_repeat_fields_with_choice_values(repeat_entries, repeat_field):
    for entry in repeat_entries:
        for field in repeat_field.fields:
            if field.is_field_set and field.is_group():
                group_answers = entry.get(field.code)
                if group_has_answers(group_answers, field):
                    _update_repeat_fields_with_choice_values(group_answers, field)
                else:
                    entry[field.code] = ''
            elif isinstance(field, SelectField):
                _update_choice_value(entry, field)


def _update_search_dict(dbm, form_model, fields, search_dict, submission_doc, submission_values, parent_field_name=None):
    for field in fields:
        entry = submission_values.get(field.code)
        label_to_be_displayed = get_label_to_be_displayed(entry, field, form_model, submission_doc)
        es_field_name = es_questionnaire_field_name(field.code, form_model.id, parent_field_name)

        if label_to_be_displayed:
            if 'media' not in search_dict.keys():
                search_dict.update({'media': {}})

            if isinstance(field, PhotoField):
                search_dict['media'].update({es_field_name:
                    {
                        'type': 'image',
                        'value': entry,
                        'download_link': '/download/attachment/' + submission_doc.id + '/' + entry,
                        'preview_link': '/download/attachment/' + submission_doc.id + '/preview_' + entry,
                    }
                })

            if isinstance(field, AudioField):
                search_dict['media'].update({es_field_name:
                    {
                        'type': 'audio',
                        'value': entry,
                        'download_link': '/download/attachment/' + submission_doc.id + '/' + entry,
                    }
                })

            if isinstance(field, VideoField):
                search_dict['media'].update({es_field_name:
                    {
                        'type': 'video',
                        'value': entry,
                        'download_link': '/download/attachment/' + submission_doc.id + '/' + entry,
                    }
                })

            if isinstance(field, FieldSet):
                if field.is_group():
                    for value in entry:
                        _update_search_dict(dbm, form_model, field.fields, search_dict, submission_doc, value, field.code)
                else:
                    _update_repeat_fields_with_choice_values(label_to_be_displayed, field)
                    _update_name_unique_code(dbm, label_to_be_displayed, field)
                    search_dict.update({es_field_name: json.dumps(label_to_be_displayed)})
            elif field.is_entity_field:
                entity, choice_to_entity_entry = get_entity(dbm, entry, field, form_model, submission_doc)
                if entry:
                    search_dict.update({es_unique_id_details_field_name(es_field_name): entity})
                    search_dict.update({es_unique_id_code_field_name(es_field_name): choice_to_entity_entry or UNKNOWN})
                search_dict.update({es_field_name: entry and entity.get('q2')})
            else:
                search_dict.update({es_field_name: label_to_be_displayed})

    search_dict.update({'void': submission_doc.void})
    search_dict.update({'is_anonymous': submission_doc.is_anonymous_submission})


def get_label_to_be_displayed(entry, field, form_model, submission_doc):
    if field.type == "select":
        old_field = _get_select_field_by_revision(field, form_model, submission_doc)
        if old_field.type == "select":
            entry = old_field.get_option_value_list(entry)
        elif old_field.type == "select1":
            entry = ",".join(old_field.get_option_value_list(entry))
    elif field.type == "select_one_external":
        old_field = _get_select_field_by_revision(field, form_model, submission_doc)
        if isinstance(old_field, SelectOneExternalField):
            itemsets_data = form_model.get_attachments('itemsets.csv')
            entry = old_field.get_option_value_list(entry, itemsets_data)
    elif field.type == "select1":
        old_field = _get_select_field_by_revision(field, form_model, submission_doc)
        if old_field.type == "select":
            entry = old_field.get_option_value_list(entry)
        elif old_field.type == "select1":
            entry = ",".join(old_field.get_option_value_list(entry))
    elif field.type == 'text':
        field_for_revision = form_model.get_field_by_code_and_rev(field.code, submission_doc.form_model_revision)
        if isinstance(field_for_revision, SelectField):
            entry = _get_select_field_answer_from_snapshot(entry, field_for_revision)
    elif field.type == "date":
        try:
            if form_model.revision != submission_doc.form_model_revision:
                old_submission_value = entry
                to_format = field.date_format
                field_for_revision = form_model.get_field_by_code_and_rev(field.code,
                                                                          submission_doc.form_model_revision)
                if isinstance(field_for_revision, DateField):
                    current_date = field_for_revision.__date__(entry)
                    entry = current_date.strftime(DateField.DATE_DICTIONARY.get(to_format))
                elif isinstance(field_for_revision, SelectField):
                    entry = _get_select_field_answer_from_snapshot(entry, field_for_revision)
                logger.info("Converting old date submission from %s to %s" % (old_submission_value, entry))
        except Exception:
            pass
    return entry


def get_entity(dbm, entry, field, form_model, submission_doc):
    choice_to_entity_entry = entry
    original_field = form_model.get_field_by_code_and_rev(field.code, submission_doc.form_model_revision)
    if is_original_question_changed_from_choice_answer_type(original_field, field):
        choice_to_entity_entry = convert_choice_options_to_options_text(original_field, entry)
    return lookup_entity(dbm, choice_to_entity_entry, [field.unique_id_type]), choice_to_entity_entry


def _update_name_unique_code(dbm, repeat_entries, fieldset_field):
    for entry in repeat_entries:
        for field in fieldset_field.fields:
            if isinstance(field, UniqueIdField):
                unique_code = entry.get(field.code)
                unique_id_name = lookup_entity(dbm, str(unique_code), [field.unique_id_type]).get('q2')
                entry[field.code + '_unique_code'] = unique_code if unique_code else ''
                entry[field.code] = unique_id_name
            elif isinstance(field, FieldSet):
                _update_name_unique_code(dbm, entry.get(field.code), field)


def _update_with_form_model_fields(dbm, submission_doc, search_dict, form_model):
    # Submission value may have capitalized keys in some cases. This conversion is to do
    # case insensitive lookup.
    submission_values = OrderedDict((k, v) for k, v in submission_doc.values.iteritems())
    _update_search_dict(dbm, form_model, form_model.fields, search_dict, submission_doc, submission_values)
    return search_dict


def get_unregistered_datasenders(dbm, questionnaire_id):
    facets = elasticutils.S().es(urls=ELASTIC_SEARCH_URL, timeout=ELASTIC_SEARCH_TIMEOUT).indexes(dbm.database_name) \
        .doctypes(questionnaire_id).filter(is_anonymous=True, ds_id='n/a', void=False) \
        .facet('ds_name_exact', filtered=True).facet_counts()['ds_name_exact']

    return [facet['term'] for facet in facets]


def get_unregistered_datasenders_count(dbm, questionnaire_id):
    return len(get_unregistered_datasenders(dbm, questionnaire_id))


def get_non_deleted_submission_count(dbm, questionnaire_id):
    return elasticutils.S().es(urls=ELASTIC_SEARCH_URL, timeout=ELASTIC_SEARCH_TIMEOUT) \
        .indexes(dbm.database_name).doctypes(questionnaire_id) \
        .filter(void=False).count()