scitran/core

View on GitHub
api/handlers/dataexplorerhandler.py

Summary

Maintainability
F
3 days
Test Coverage
import copy
import json

from elasticsearch import ElasticsearchException, TransportError, RequestError, helpers

from ..web import base
from .. import config
from ..auth import require_login, require_superuser

log = config.log

"""
EXAMPLE_SESSION_QUERY = {
  "size": 0,
  "query": {
    "match": {
      "_all": "test'"
    }
  },
  "aggs": {
    "by_session": {
      "terms": {
        "field": "session._id",
        "size": 100
      },
      "aggs": {
        "by_top_hit": {
          "top_hits": {
            "size": 1
          }
        }
      }
    }
  }
}

EXAMPLE_ACQUISITION_QUERY = {
  "size": 0,
  "query": {
    "match": {
      "_all": "megan'"
    }
  },
  "aggs": {
    "by_session": {
      "terms": {
        "field": "acquisition._id",
        "size": 100
      },
      "aggs": {
        "by_top_hit": {
          "top_hits": {
            "size": 1
          }
        }
      }
    }
  }
}

EXAMPLE_FILE_QUERY = {
  "size": 100,
  "query": {
    "bool": {
      "must": {
        "match": {
          "_all": "brain"
        }
      },
      "filter": {
        "bool" : {
          "must" : [
             { "term" : {"file.type" : "dicom"}},
             { "term" : {"container_type" : "file"}}
          ]
        }
      }
    }
  }
}
"""


ANALYSIS = {
    "analyzer": {
        "my_analyzer": {
            "tokenizer": "my_tokenizer",
            "filter": ["lowercase"]
        }
    },
    "tokenizer": {
        "my_tokenizer": {
            "type": "ngram",
            "min_gram": 2,
            "max_gram": 100,
            "token_chars": [
                "letter",
                "digit",
                "symbol",
                "punctuation"
            ]
        }
    }
}

DYNAMIC_TEMPLATES = [
    {
        'string_fields' : {
            'match': '*',
            'match_mapping_type' : 'string',
            'mapping' : {
                'type': 'text',
                'analyzer': 'my_analyzer',
                'search_analyzer': 'standard',
                'index': True,
                "fields": {
                    "raw": {
                        "type": "keyword",
                        "index": True,
                        "ignore_above": 256
                    }
                }
            }
        }
    }
]

MATCH_ALL= {"match_all": {}}

FACET_QUERY = {
    "size": 0,
    "aggs" : {
        "session_count" : {
            "cardinality" : {
                "field" : "session._id"
            }
        },
        "acquisition_count" : {
            "cardinality" : {
                "field" : "acquisition._id"
            }
        },
        "analysis_count" : {
            "cardinality" : {
                "field" : "analysis._id"
            }
        },
        "file_count" : {
            "cardinality" : {
                "field" : "file._id"
            }
        },
        "by_session": {
            "filter": {"term": {"container_type": "session"}},
            "aggs": {
                "subject.sex" : {
                    "terms" : {
                        "field" : "subject.sex.raw",
                        "size" : 15,
                        "missing": "null"
                    }
                },
                "session.tags" : {
                    "terms" : {
                        "field" : "subject.tags.raw",
                        "size" : 15,
                        "missing": "null"
                    }
                },
                "subject.code" : {
                    "terms" : {
                        "field" : "subject.code.raw",
                        "size" : 15,
                        "missing": "null"
                    }
                },
                "session.timestamp" : {
                    "stats" : { "field" : "session.timestamp"}

                },
            }
        },
        "session_age": {
            "filter": {
                "bool" : {
                  "must" : [
                     {"range": {"subject.age": {"gte": -31556952, "lte": 3155695200}}},
                     {"term": {"container_type": "session"}}
                  ]
                }
            },
            "aggs": {
                "subject.age" : {
                    "histogram" : {
                        "field" : "subject.age",
                        "interval" : 31556952,
                        "extended_bounds" : {
                            "min" : -31556952,
                            "max" : 3155695200
                        }
                    }
                }
            }
        },
        "by_file": {
            "filter": {"term": {"container_type": "file"}},
            "aggs": {

                "file.measurements" : {
                    "terms" : {
                        "field" : "file.measurements.raw",
                        "size" : 15,
                        "missing": "null"
                    }
                },
                "file.type" : {
                    "terms" : {
                        "field" : "file.type.raw",
                        "size" : 15,
                        "missing": "null"
                    }
                }
            }
        }
    }
}

INFO_EXISTS_SCRIPT = {
    'script': """
        (params['_source'].containsKey('file') &&
        params['_source']['file'].containsKey('info') &&
        !params['_source']['file']['info'].empty)
    """
}


SOURCE_COMMON = [
    "group._id",
    "group.label",
    "permissions",
]

SOURCE_COLLECTION = [
    "permissions",
    "collection._id",
    "collection.label",
    "collection.curator",
    "collection.created",
]

SOURCE_PROJECT = SOURCE_COMMON + [
    "project._id",
    "project.label",
]

SOURCE_SESSION = SOURCE_PROJECT + [
    "session._id",
    "session.created",
    "session.label",
    "session.timestamp",
    "subject.code",
]

SOURCE_ACQUISITION = SOURCE_SESSION + [
    "acquisition._id",
    "acquisition.created",
    "acquisition.label",
    "acquisition.timestamp",
]

SOURCE_ANALYSIS = SOURCE_SESSION + [
    "analysis._id",
    "analysis.created",
    "analysis.label",
    "analysis.user",
    "analysis.parent", # TODO: coalesce analysis and file parent keys (analysis.parent.id vs parent._id for file)
]

SOURCE_FILE = SOURCE_ANALYSIS + [
    "file.created",
    "file.measurements",
    "file.name",
    "file.size",
    "file.type",
    "parent",
]

SOURCE = {
    "collection": SOURCE_COLLECTION,
    "project": SOURCE_PROJECT,
    "session": SOURCE_SESSION,
    "acquisition": SOURCE_ACQUISITION,
    "analysis": SOURCE_ANALYSIS,
    "file": SOURCE_FILE
}

# Containers where search doesn't do an aggregation to find results
EXACT_CONTAINERS = ['file', 'collection']


class DataExplorerHandler(base.RequestHandler):
    # pylint: disable=broad-except

    def __init__(self, request=None, response=None):
        super(DataExplorerHandler, self).__init__(request, response)

    def _parse_request(self, request_type='search'):

        try:
            request = self.request.json_body
        except (ValueError):
            if request_type == 'search':
                self.abort(400, 'Must specify return type')
            return None, None, None, 0

        # Parse and validate return_type
        return_type = request.get('return_type')
        if not return_type or return_type not in ['collection', 'project', 'session', 'acquisition', 'analysis', 'file']:
            if request_type == 'search':
                self.abort(400, 'Must specify return type')

        # Parse and "validate" filters, allowed to be non-existent
        filters = request.get('filters', [])
        if type(filters) is not list:
            self.abort(400, 'filters must be a list')

        modified_filters = []

        for f in filters:
            if f.get('terms'):
                for k,v in f['terms'].iteritems():
                    if "null" in v:
                        if isinstance(v, list):
                            v.remove("null")
                        elif isinstance(v, str):
                            v = None
                        null_filter = {
                            'bool': {
                                'should': [
                                    {
                                        'bool': {
                                            'must': [
                                                {
                                                    'bool':{
                                                        'must_not': [
                                                            {
                                                                'exists': {'field': k}
                                                            }
                                                        ]
                                                    }
                                                }
                                            ]
                                        }
                                    }
                                ]
                            }
                        }
                        if len(k.split('.')) > 1:
                            null_filter['bool']['should'][0]['bool']['must'].append({'exists': {'field': k.split('.')[0]}})
                        if v:
                            null_filter['bool']['should'].append({'terms': {k+'.raw': v}})
                        modified_filters.append(null_filter)

                    else:
                        modified_filters.append({'terms': {k+'.raw': v}})
            elif f.get('term'):
                # Search raw field
                for k,v in f['term'].iteritems():
                    modified_filters.append({'term': {k+'.raw': v}})
            else:
                modified_filters.append(f)

        # Add permissions filter to list if user is not requesting all data or is superuser
        if not request.get('all_data', False) and not self.superuser_request:
            modified_filters.append({'term': {'permissions._id': self.uid}})

        # Only return objects that have not been marked as deleted
        modified_filters.append({'term': {'deleted': False}})

        # Parse and "validate" search_string, allowed to be non-existent
        search_string = str(request.get('search_string', ''))


        if request_type == 'facet':
            # size is assumed 0 for facets
            return return_type, modified_filters, search_string, 0

        # Determine query size, if size=all figure out max for return type.
        size = self.request.params.get('size')
        if not size:
            size = self.request.json_body.get("size", 100)
        if size == 'all':
            size = self.search_size(return_type, filters=modified_filters)
        elif not isinstance(size, int):
            try:
                size = int(size)
            except ValueError:
                self.abort(400, 'Size must be an int or "all".')

        # Check that size is less than 10,000
        if int(size) > 10000:
            self.abort(400, "Request would return more than 10,000 results. Please add additional filters.")

        return return_type, modified_filters, search_string, size

    @require_login
    def aggregate_field_values(self):
        """
        Return list of type ahead values for a key given a value
        that the user has already started to type in for the value of
        a custom string field or a set of statistics if the field type is
        a number.
        """
        try:
            field_name = self.request.json_body['field_name']
        except (KeyError, ValueError):
            self.abort(400, 'Field name is required')
        filters = [{'term': {'deleted': False}}]
        if not self.superuser_request:
            filters.append({'term': {'permissions._id': self.uid}})
        try:
            field = config.es.get(index='data_explorer_fields', id=field_name, doc_type='flywheel_field')
        except TransportError as e:
            log.warning(e)
            self.abort(404, 'Could not find mapping for field {}.'.format(field_name))
        field_type = field['_source']['type']
        search_string = self.request.json_body.get('search_string', None)


        # If the field type is a string, return a list of type-ahead values
        body = {
            "size": 0,
            "query": {
                "bool": {
                    "must" : {
                        "match" : { field_name : search_string}
                    },
                    "filter" : filters
                }
            }
        }
        if not filters:
            # TODO add non-user auth support (#865)
            body['query']['bool'].pop('filter')
        if search_string is None:
            body['query']['bool']['must'] = MATCH_ALL

        if field_type in ['string', 'boolean']:
            body['aggs'] = {
                "results" : {
                    "terms" : {
                        "field" : field_name + ".raw",
                        "size" : 15,
                        "missing": "null"
                    }
                }
            }

        # If it is a number (int, date, or some other type), return various statistics on the values of the field
        elif field_type in ['integer', 'float', 'date']:
            body['aggs'] = {
                "results" : {
                    "stats" : {
                        "field" : field_name
                    }
                }
            }
        else:
            self.abort(400, 'Aggregations are only allowed on string, integer, float, data and boolean fields.')

        aggs = config.es.search(
            index='data_explorer',
            doc_type='flywheel',
            body=body
        )['aggregations']['results']
        return aggs

    @require_login
    def get_facets(self):

        _, filters, search_string, _ = self._parse_request(request_type='facet')

        facets_q = copy.deepcopy(FACET_QUERY)
        facets_q['query'] = self._construct_query(None, search_string, filters, 0)['query']

        # if the query comes back with a return_type agg, remove it
        facets_q['query'].pop('aggs', None)

        aggs = config.es.search(
            index='data_explorer',
            doc_type='flywheel',
            body=facets_q
        )['aggregations']

        # This aggregation needs an extra filter to filter out outliers (only shows ages between -1 and 100)
        # Add it back in to the session aggregation node
        age_node = aggs.pop('session_age')
        aggs['by_session']['subject.age'] = age_node['subject.age']
        return {'facets': aggs}

    def search_size(self, return_type, filters=None):
        body = {
            "size": 0,
            "aggs" : {
                "count" : {
                    "cardinality" : {
                        "field" : return_type + "._id",
                        "precision_threshold": 100000
                    }
                }
            }
        }

        if filters:
            body["query"] = {
                "bool": {
                    "filter": filters
                }
            }
        size = config.es.search(
            index='data_explorer',
            doc_type='flywheel',
            body=body)['aggregations']['count']['value']
        size = int(size*1.02)
        return size

    def get_nodes(self):

        return_type, filters, search_string, size = self._parse_request()
        if return_type == 'file':
            return self.get_file_nodes(return_type, filters, search_string)

        body = self._construct_query(return_type, search_string, filters, size)

        body['aggs']['by_container'].pop('aggs')
        body['_source'] = [return_type + "._id"]

        nodes = []
        results = config.es.search(
            index='data_explorer',
            doc_type='flywheel',
            body=body)['aggregations']['by_container']['buckets']

        for result in results:
            nodes.append({'level': return_type, '_id': result['key']})
        return {'nodes':nodes}

    def get_file_nodes(self, return_type, filters, search_string):

        query = self._construct_file_query(return_type, filters, search_string)['query']

        nodes = []
        results = helpers.scan(client=config.es, query={'query': query}, scroll='5m', size=1000, index='data_explorer', doc_type='flywheel', _source=[return_type+'._id'])
        for result in results:
            nodes.append({'level': return_type, '_id': result['_source'][return_type]['_id']})
        return {'nodes':nodes}


    @require_login
    def search_fields(self):
        field_query = self.request.json_body.get('field')

        es_query = {
            "size": 15,
            "query": {
                "match" : { "name" : field_query }
            }
        }
        try:
            es_results = config.es.search(
                index='data_explorer_fields',
                doc_type='flywheel_field',
                body=es_query
            )
        except TransportError as e:
            config.log.warning('Fields not yet indexed for search: {}'.format(e))
            return []

        results = []
        for result in es_results['hits']['hits']:
            results.append(result['_source'])

        return results


    @require_login
    def search(self):
        return_type, filters, search_string, size = self._parse_request()

        results = self._run_query(self._construct_query(return_type, search_string, filters, size), return_type)

        if self.is_true('simple'):
            #return a list of the results' `_source` key only
            return [x['_source'] for x in results]

        else:
            response = {'results': results}
            if self.is_true('facets'):
                response['facets'] = self.get_facets()
            return response


    ## CONSTRUCTING QUERIES ##

    def _construct_query(self, return_type, search_string, filters, size=100):
        if return_type in EXACT_CONTAINERS:
            return self._construct_exact_query(return_type, search_string, filters, size)

        query = {
            "size": 0,
            "query": {
                "bool": {
                  "must": {
                    "match": {
                      "_all": search_string
                    }
                  },
                  "filter": {
                    "bool" : {
                      "must" : filters
                    }
                  }
                }
            }
        }

        if return_type: # only searches have a return type, not facet queries
            query['aggs'] = {
                "by_container": {
                    "terms": {
                        "field": return_type+"._id",
                        "size": size
                    },
                    "aggs": {
                        "by_top_hit": {
                            "top_hits": {
                                "_source": SOURCE[return_type],
                                "size": 1
                            }
                        }
                    }
                }
            }


        # Add search_string to "match on _all fields" query, otherwise remove unneeded logic
        if not search_string:
            query['query']['bool'].pop('must')

        # Add filters list to filter key on query if exists
        if not filters:
            query['query']['bool'].pop('filter')

        if not search_string and not filters:
            query['query'] = MATCH_ALL

        return query

    def _construct_exact_query(self, return_type, search_string, filters, size=100):
        query = {
          "size": size,
          "_source": SOURCE[return_type],
          "query": {
            "bool": {
              "must": {
                "match": {
                  "_all": ""
                }
              },
              "filter": {
                "bool" : {
                  "must" : [{ "term" : {"container_type" : return_type}}]
                }
              }
            }
          }
        }

        if return_type == 'file':
            query['script_fields'] = {
            "info_exists" : INFO_EXISTS_SCRIPT
        }

        # Add search_string to "match on _all fields" query, otherwise remove unneeded logic
        if search_string:
            query['query']['bool']['must']['match']['_all'] = search_string
        else:
            query['query']['bool'].pop('must')

        # Add filters list to filter key on query if exists
        if filters:
            query['query']['bool']['filter']['bool']['must'].extend(filters)

        return query


    ## RUNNING QUERIES AND PROCESSING RESULTS ##

    def _run_query(self, es_query, result_type):
        try:
            results = config.es.search(
                index='data_explorer',
                doc_type='flywheel',
                body=es_query
            )
        except RequestError:
            self.abort(400, 'Unable to parse filters - invalid format.')

        return self._process_results(results, result_type)

    def _process_results(self, results, result_type):
        if result_type in EXACT_CONTAINERS:
            return self._process_exact_results(results, result_type)
        else:
            containers = results['aggregations']['by_container']['buckets']
            modified_results = []
            for c in containers:
                modified_results.append(c['by_top_hit']['hits']['hits'][0])
            return modified_results

    def _process_exact_results(self, results, result_type):
        results = results['hits']['hits']
        if result_type == 'file':

            # Note: At some point this would be better suited
            # as an indexed field rather than scripted on the fly
            for r in results:
                fields = r.pop('fields', {})
                r['_source']['file']['info_exists'] = fields.get('info_exists')[0]

        return results





### Field mapping index helper functions
    @classmethod
    def _get_field_type(cls, field_type):
        if field_type in ['text', 'keyword']:
            return 'string'
        elif field_type in ['long', 'integer', 'short', 'byte']:
            return 'integer'
        elif field_type in ['double', 'float']:
            return 'float'
        elif field_type in ['date', 'boolean', 'object']:
            return field_type
        else:
            config.log.debug('Didnt recognize this field type {}, setting as string'.format(field_type))

    @classmethod
    def _handle_properties(cls, properties, current_field_name):

        ignore_fields = [
            '_all', 'dynamic_templates', 'analysis_reference', 'file_reference',
            'parent', 'container_type', 'origin', 'permissions', '_id',
            'project_has_template', 'hash'
        ]

        for field_name, field in properties.iteritems():

            # Ignore some fields
            if field_name in ignore_fields:
                continue

            elif 'properties' in field:
                new_curr_field = current_field_name+'.'+field_name if current_field_name != '' else field_name
                cls._handle_properties(field['properties'], new_curr_field)

            else:
                field_type = cls._get_field_type(field['type'])
                facet_status = False
                if field_type == 'object':
                    # empty objects don't get added
                    continue

                field_name = current_field_name+'.'+field_name if current_field_name != '' else field_name

                if field_type == 'string':
                    # if >85% of values fall in top 15 results, mark as a facet
                    body = {
                        "size": 0,
                        "aggs" : {
                            "results" : {
                                "terms" : {
                                    "field" : field_name + ".raw",
                                    "size" : 15
                                }
                            }
                        }
                    }

                    aggs = config.es.search(
                        index='data_explorer',
                        doc_type='flywheel',
                        body=body
                    )['aggregations']['results']

                    other_doc_count = aggs['sum_other_doc_count']
                    facet_doc_count = sum([bucket['doc_count'] for bucket in aggs['buckets']])
                    total_doc_count = other_doc_count+facet_doc_count

                    if other_doc_count == 0 and facet_doc_count > 0:
                        # All values fit in 15 or fewer buckets
                        facet_status = True
                    elif other_doc_count > 0 and facet_doc_count > 0 and (facet_doc_count/total_doc_count) > 0.85:
                        # Greater than 85% of values fit in 15 or fewer buckets
                        facet_status = True
                    else:
                        # There are no values or too diverse of values
                        facet_status = False

                doc = {
                    'name':                 field_name,
                    'type':                 field_type,
                    'facet':                facet_status
                }

                doc_s = json.dumps(doc)
                config.es.index(index='data_explorer_fields', id=field_name, doc_type='flywheel_field', body=doc_s)

    @require_superuser
    def index_field_names(self):

        try:
            if not config.es.indices.exists('data_explorer'):
                self.abort(404, 'data_explorer index not yet available')
        except TransportError as e:
            self.abort(404, 'elastic search not available: {}'.format(e))

        # Sometimes we might want to clear out what is there...
        if self.is_true('hard-reset') and config.es.indices.exists('data_explorer_fields'):
            config.log.debug('Removing existing data explorer fields index...')
            try:
                config.es.indices.delete(index='data_explorer_fields')
            except ElasticsearchException as e:
                self.abort(500, 'Unable to clear data_explorer_fields index: {}'.format(e))

        # Check to see if fields index exists, if not - create it:
        if not config.es.indices.exists('data_explorer_fields'):
            request = {
                'settings': {
                    'number_of_shards': 1,
                    'number_of_replicas': 0,
                    'analysis' : ANALYSIS
                },
                'mappings': {
                    '_default_' : {
                        '_all' : {'enabled' : True},
                        'dynamic_templates': DYNAMIC_TEMPLATES
                    },
                    'flywheel': {}
                }
            }

            config.log.debug('creating data_explorer_fields index ...')
            try:
                config.es.indices.create(index='data_explorer_fields', body=request)
            except ElasticsearchException:
                self.abort(500, 'Unable to create data_explorer_fields index: {}'.format(e))

        try:
            mappings = config.es.indices.get_mapping(index='data_explorer', doc_type='flywheel')
            fw_mappings = mappings['data_explorer']['mappings']['flywheel']['properties']
        except (TransportError, KeyError):
            self.abort(404, 'Could not find mappings, exiting ...')

        self._handle_properties(fw_mappings, '')