ignatiusreza/elasticsearch-model-transactional_callbacks

View on GitHub
lib/elasticsearch/model/transactional_callbacks/bulk_indexing_job.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

module Elasticsearch
  module Model
    module TransactionalCallbacks
      ##
      # Background job which handles the request to index/update/delete documents asynchronously
      #
      #   Elasticsearch::Model::TransactionalCallbacks::BulkIndexingJob.perform_later(
      #     document_type: {
      #       index: [{ _id: document.id }],
      #       update: [{ _id: document.id }],
      #       delete: [{ _id: document.id }],
      #     }
      #   )
      #
      class BulkIndexingJob < ::ActiveJob::Base
        queue_as :default

        def perform(indexables)
          indexables.each do |document_type, action_map|
            klass = document_type.to_s.camelcase.constantize
            body = transform_batches(klass, action_map)

            response = bulk_index klass, body

            ::Rails.logger.error "[ELASTICSEARCH] Bulk request failed: #{response['items']}" if response&.dig('errors')
          end
        end

        private

          def transform_batches(klass, action_map)
            reverse_map = build_reverse_map(action_map)
            resources = klass.where id: reverse_map.keys

            preload(resources).find_each.map { |resource|
              action, option = reverse_map[resource.id]

              send "transform_#{action}", resource, option
            } + action_map.fetch(:delete, []).map { |option|
              transform_delete(option)
            }
          end

          def build_reverse_map(action_map)
            action_map.each_with_object({}) { |map, memo|
              action, options = map

              next if action == :delete

              options.each do |option|
                memo[option[:_id]] = [action, option]
              end
            }
          end

          def preload(resources)
            resources.respond_to?(:preload_for_import) ? resources.preload_for_import : resources
          end

          def transform_index(resource, option)
            { index: option.merge(data: to_indexed_json(resource)) }
          end
          # elasticsearch do support update operation in their bulk API,
          # but it will fail in case the update is done to missing documents,
          # while index work for both new and existing document
          #
          # because of this, we choose to use index for update to avoid issue with race condition
          # where a document is updated immediately after it is created,
          # on which elasticsearch might not be aware of the document yet
          alias transform_update transform_index

          def transform_delete(option)
            { delete: option }
          end

          def to_indexed_json(resource)
            return resource.as_indexed_json if resource.respond_to?(:as_indexed_json)

            resource.__elasticsearch__.as_indexed_json
          end

          def bulk_index(klass, body)
            return if body.blank?

            klass.__elasticsearch__.client.bulk(
              index: klass.index_name,
              type: klass.document_type,
              body: body
            )
          end
      end
    end
  end
end