doximity/es-elasticity

View on GitHub
lib/elasticity/search.rb

Summary

Maintainability
A
3 hrs
Test Coverage
module Elasticity
  module Search
    def self.build(client, index_name, document_types, body, search_args = {})
      search_def = Search::Definition.new(index_name, document_types, body, search_args)
      Search::Facade.new(client, search_def)
    end

    # Elasticity::Search::Definition is a struct that encapsulates all the data specific to one
    # Elasticsearch search.
    class Definition
      attr_accessor :index_name, :document_types, :body

      def initialize(index_name, document_types, body, search_args = {})
        @index_name     = index_name
        @document_types = document_types
        @body           = body.deep_symbolize_keys!
        @search_args    = search_args
      end

      def update(body_changes)
        self.class.new(@index_name, @document_types, @body.deep_merge(body_changes))
      end

      def to_count_args
        { index: @index_name }.tap do |args|
          body = @body.slice(:query)
          args[:body] = body if body.present?
        end
      end

      def to_search_args
        @search_args.merge({ index: @index_name, body: @body })
      end

      def to_msearch_args
        search_body = @search_args.merge(@body)

        { index: @index_name, search: search_body }
      end
    end

    # Elasticity::Search::Facade provides a simple interface for defining a search and provides
    # different ways of executing it against Elasticsearch. This is usually the main entry point
    # for search.
    class Facade
      attr_accessor :search_definition

      # Creates a new facade for the given search definition, providing a set of helper methods
      # to trigger different type of searches and results interpretation.
      def initialize(client, search_definition)
        @client            = client
        @search_definition = search_definition
      end

      # Performs the search using the default search type and returning an iterator that will yield
      # hash representations of the documents.
      def document_hashes(search_args = {})
        return @document_hashes if defined?(@document_hashes)
        @document_hashes = LazySearch.new(@client, @search_definition, search_args)
      end

      # Performs the search using the default search type and returning an iterator that will yield
      # each document, converted using the provided mapper
      def documents(mapper, search_args = {})
        return @documents if defined?(@documents)
        @documents = LazySearch.new(@client, @search_definition, search_args) do |hit|
          mapper.(hit)
        end
      end

      # Performs the search using the scan search type and the scoll api to iterate over all the documents
      # as fast as possible. The sort option will be discarded.
      #
      # More info: http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/scan-scroll.html
      def scan_documents(mapper, **options)
        return @scan_documents if defined?(@scan_documents)
        @scan_documents = ScanCursor.new(@client, @search_definition, mapper, **options)
      end

      # Performs the search only fetching document ids using it to load ActiveRecord objects from the provided
      # relation. It returns the relation matching the objects found on ElasticSearch.
      def active_records(relation)
        ActiveRecordProxy.new(@client, @search_definition, relation)
      end
    end

    class LazySearch
      include Enumerable

      delegate :each, :size, :length, :[], :+, :-, :&, :|, :total, :per_page,
        :total_pages, :current_page, :next_page, :previous_page, :aggregations, to: :search_results

      attr_accessor :search_definition

      def initialize(client, search_definition, search_args, &mapper)
        @client            = client
        @search_definition = search_definition
        @mapper            = mapper
        @search_args       = search_args
      end

      def empty?
        total == 0
      end

      def blank?
        empty?
      end

      def suggestions
        response["suggest"] ||= {}
      end

      def count(args = {})
        @client.count(@search_definition.to_count_args.reverse_merge(args))["count"]
      end

      def search_results
        @search_results ||= Search::Results.new(response, @search_definition.body, @mapper)
      end

      private

      def response
        return @response if defined?(@response)
        @response = @client.search(@search_definition.to_search_args.reverse_merge(@search_args))
      end
    end

    class ScanCursor
      include Enumerable

      def initialize(client, search_definition, mapper, size: 100, scroll: "1m")
        @client            = client
        @search_definition = search_definition
        @mapper            = mapper
        @size              = size
        @scroll            = scroll
      end

      def empty?
        total == 0
      end

      def blank?
        empty?
      end

      def total
        res = search["hits"]["total"]
        if res.is_a?(::Hash)
          res["value"]
        else
          res
        end
      end

      def each_batch
        enumerator.each do |group|
          yield(group)
        end
      end

      def each
        enumerator.each do |group|
          group.each { |doc| yield(doc) }
        end
      end

      private

      def enumerator
        Enumerator.new do |y|
          response = search
          # Push the first set of results before requesting the second set
          y << Search::Results.new(response, @search_definition.body, @mapper)
          loop do
            response = @client.scroll(scroll_id: response["_scroll_id"], scroll: @scroll, body: { scroll_id: response["_scroll_id"] })
            break if response["hits"]["hits"].empty?

            y << Search::Results.new(response, @search_definition.body, @mapper)
          end
        end
      end

      def search
        return @search if defined?(@search)
        args    = @search_definition.to_search_args
        args    = args.merge(search_type: :query_then_fetch, size: @size, scroll: @scroll)
        @search = @client.search(args)
      end
    end

    class ActiveRecordProxy
      def self.map_response(relation, body, response)
        ids = response["hits"]["hits"].map { |hit| hit["_id"] }

        if ids.any?
          id_col  = "#{relation.connection.quote_column_name(relation.table_name)}.#{relation.connection.quote_column_name(relation.klass.primary_key)}"
          id_vals = ids.map { |id| relation.connection.quote(id) }
          Relation.new(relation.where("#{id_col} IN (?)", ids).order(Arel.sql("FIELD(#{id_col}, #{id_vals.join(',')})")), body, response)
        else
          Relation.new(relation.none, body, response)
        end
      end

      class Relation < ActiveSupport::ProxyObject

        delegate :total, :per_page, :total_pages, :current_page, :next_page,
          :previous_page, :aggregations, to: :@results

        def initialize(relation, search_definition, response)
          @relation = relation
          @search_definition = search_definition
          @response = response
          @results = Results.new(response, search_definition)
        end

        def method_missing(name, *args, &block)
          @relation.public_send(name, *args, &block)
        end

        def pretty_print(pp)
          pp.object_group(self) do
            pp.text " #{@relation.to_sql}"
          end
        end

        def inspect
          "#<#{self.class}: #{@relation.to_sql}>"
        end
      end

      def initialize(client, search_definition, relation)
        @client            = client
        @search_definition = search_definition.update(_source: false)
        @relation          = relation
      end

      def metadata
        @metadata ||= { total: response["hits"]["total"], suggestions: response["suggest"] || {} }
      end

      def total
        res = metadata[:total]
        if res.is_a?(::Hash)
          res["value"]
        else
          res
        end
      end

      def suggestions
        metadata[:suggestions]
      end

      def method_missing(name, *args, &block)
        filtered_relation.public_send(name, *args, &block)
      end

      private

      def response
        @response ||= @client.search(@search_definition.to_search_args)
      end

      def filtered_relation
        return @filtered_relation if defined?(@filtered_relation)
        @filtered_relation = ActiveRecordProxy.map_response(@relation, @search_definition.body, response)
      end
    end

    class DocumentProxy < BasicObject
      def initialize(search, document_klass)
        @search         = search
        @document_klass = document_klass
      end

      delegate :search_definition, :active_records, to: :@search

      def documents(search_args = {})
        @search.documents(@document_klass, search_args)
      end

      def scan_documents(**options)
        @search.scan_documents(@document_klass, **options)
      end

      def method_missing(method_name, *args, &block)
        documents.public_send(method_name, *args, &block)
      end
    end

    class Results < ActiveSupport::ProxyObject
      include ::Enumerable

      delegate :each, :size, :length, :[], :+, :-, :&, :|, to: :@documents

      DEFAULT_SIZE = 10

      def initialize(response, body, mapper = nil)
        @response = response
        @body = body
        @documents = if mapper.nil?
          @response["hits"]["hits"]
        else
          @response["hits"]["hits"].map { |hit| mapper.(hit) }
        end
      end

      def method_missing(name, *args, &block)
        @documents.public_send(name, *args, &block)
      end

      def each(&block)
        @documents.each(&block)
      end

      def aggregations
        @response["aggregations"] ||= {}
      end

      def total
        res = @response["hits"]["total"]
        if res.is_a?(::Hash)
          res["value"]
        else
          res
        end
      end
      alias_method :total_entries, :total

      # for pagination
      def total_pages
        (total.to_f / per_page.to_f).ceil
      end

      # for pagination
      def per_page
        @body[:size] || DEFAULT_SIZE
      end

      # for pagination
      def current_page
        return 1 if @body[:from].nil?
        @body[:from] / per_page + 1
      end

      def next_page
        current_page < total_pages ? (current_page + 1) : nil
      end

      def previous_page
        current_page > 1 ? (current_page - 1) : nil
      end
    end
  end
end