toptal/chewy

View on GitHub
lib/chewy/index/import/routine.rb

Summary

Maintainability
A
0 mins
Test Coverage
module Chewy
  class Index
    module Import
      # This class performs the import routine for the options and objects given.
      #
      # 0. Create target and journal indexes if needed.
      # 1. Iterate over all the passed objects in batches.
      # 2. For each batch {#process} method is called:
      #   * creates a bulk request body;
      #   * appends journal entries for the current batch to the request body;
      #   * prepends a leftovers bulk to the request body, which is calculated
      #     basing on the previous iteration errors;
      #   * performs the bulk request;
      #   * composes new leftovers bulk for the next iteration basing on the response errors if `update_failover` is true;
      #   * appends the rest of unfixable errors to the instance level errors array.
      # 4. Perform the request for the last leftovers bulk if present using {#extract_leftovers}.
      # 3. Return the result errors array.
      #
      # At the moment, it tries to restore only from the partial document update errors in cases
      # when the document doesn't exist only if `update_failover` option is true. In order to
      # restore, it indexes such an objects completely on the next iteration.
      #
      # @see Chewy::Index::Import::ClassMethods#import
      class Routine
        BULK_OPTIONS = %i[
          suffix bulk_size
          refresh timeout fields pipeline
          consistency replication
          wait_for_active_shards routing _source _source_exclude _source_include
        ].freeze

        DEFAULT_OPTIONS = {
          refresh: true,
          update_fields: [],
          update_failover: true,
          batch_size: Chewy::Index::Adapter::Base::BATCH_SIZE
        }.freeze

        attr_reader :options, :parallel_options, :errors, :stats, :leftovers

        # Basically, processes passed options, extracting bulk request specific options.
        # @param index [Chewy::Index] chewy index
        # @param options [Hash] import options, see {Chewy::Index::Import::ClassMethods#import}
        def initialize(index, **options)
          @index = index
          @options = options
          @options.reverse_merge!(@index._default_import_options)
          @options.reverse_merge!(journal: Chewy.configuration[:journal])
          @options.reverse_merge!(DEFAULT_OPTIONS)
          @bulk_options = @options.slice(*BULK_OPTIONS)
          @parallel_options = @options.delete(:parallel)
          if @parallel_options && !@parallel_options.is_a?(Hash)
            @parallel_options = if @parallel_options.is_a?(Integer)
              {in_processes: @parallel_options}
            else
              {}
            end
          end
          @errors = []
          @stats = {}
          @leftovers = []
        end

        # Creates the journal index and the corresponding index if necessary.
        # @return [Object] whatever
        def create_indexes!
          Chewy::Stash::Journal.create if @options[:journal] && !Chewy.configuration[:skip_journal_creation_on_import]
          return if Chewy.configuration[:skip_index_creation_on_import]

          @index.create!(**@bulk_options.slice(:suffix)) unless @index.exists?
        end

        # The main process method. Converts passed objects to the bulk request body,
        # appends journal entries, performs this request and handles errors performing
        # failover procedures if applicable.
        #
        # @param index [Array<Object>] any acceptable objects for indexing
        # @param delete [Array<Object>] any acceptable objects for deleting
        # @return [true, false] the result of the request, true if no errors
        def process(index: [], delete: [])
          bulk_builder = BulkBuilder.new(@index, to_index: index, delete: delete, fields: @options[:update_fields])
          bulk_body = bulk_builder.bulk_body

          if @options[:journal]
            journal_builder = JournalBuilder.new(@index, to_index: index, delete: delete)
            bulk_body.concat(journal_builder.bulk_body)
          end

          bulk_body.unshift(*flush_leftovers)

          perform_bulk(bulk_body) do |response|
            @leftovers = extract_leftovers(response, bulk_builder.index_objects_by_id)
            @stats[:index] = @stats[:index].to_i + index.count if index.present?
            @stats[:delete] = @stats[:delete].to_i + delete.count if delete.present?
          end
        end

        # Performs a bulk request for the passed body.
        #
        # @param body [Array<Hash>] a standard bulk request body
        # @return [true, false] the result of the request, true if no errors
        def perform_bulk(body)
          response = bulk.perform(body)
          yield response if block_given?
          Chewy.wait_for_status
          @errors.concat(response)
          response.blank?
        end

      private

        def flush_leftovers
          leftovers = @leftovers
          @leftovers = []
          leftovers
        end

        def extract_leftovers(errors, index_objects_by_id)
          return [] unless @options[:update_fields].present? && @options[:update_failover] && errors.present?

          failed_partial_updates = errors.select do |item|
            item.keys.first == 'update' && item.values.first['error']['type'] == 'document_missing_exception'
          end
          failed_ids_hash = failed_partial_updates.index_by { |item| item.values.first['_id'].to_s }
          failed_ids_for_reimport = failed_ids_hash.keys & index_objects_by_id.keys
          errors_to_cleanup = failed_ids_hash.values_at(*failed_ids_for_reimport)
          errors_to_cleanup.each { |error| errors.delete(error) }

          failed_objects = index_objects_by_id.values_at(*failed_ids_for_reimport)
          BulkBuilder.new(@index, to_index: failed_objects).bulk_body
        end

        def bulk
          @bulk ||= BulkRequest.new(@index, **@bulk_options)
        end
      end
    end
  end
end