toptal/chewy

View on GitHub
lib/chewy/rake_helper.rb

Summary

Maintainability
B
4 hrs
Test Coverage
module Chewy
  module RakeHelper
    IMPORT_CALLBACK = lambda do |output, _name, start, finish, _id, payload|
      duration = (finish - start).ceil
      stats = payload.fetch(:import, {}).map { |key, count| "#{key} #{count}" }.join(', ')
      output.puts "  Imported #{payload[:index]} in #{human_duration(duration)}, stats: #{stats}"
      payload[:errors]&.each do |action, errors|
        output.puts "    #{action.to_s.humanize} errors:"
        errors.each do |error, documents|
          output.puts "      `#{error}`"
          output.puts "        on #{documents.count} documents: #{documents}"
        end
      end
    end

    JOURNAL_CALLBACK = lambda do |output, _, _, _, _, payload|
      count = payload[:groups].values.map(&:size).sum
      targets = payload[:groups].keys.sort_by(&:derivable_name)
      output.puts "  Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}"
    end

    DELETE_BY_QUERY_OPTIONS = %w[WAIT_FOR_COMPLETION REQUESTS_PER_SECOND SCROLL_SIZE].freeze
    FALSE_VALUES = %w[0 f false off].freeze

    class << self
      # Performs zero-downtime reindexing of all documents for the specified indexes
      #
      # @example
      #   Chewy::RakeHelper.reset # resets everything
      #   Chewy::RakeHelper.reset(only: 'cities') # resets only CitiesIndex
      #   Chewy::RakeHelper.reset(only: ['cities', CountriesIndex]) # resets CitiesIndex and CountriesIndex
      #   Chewy::RakeHelper.reset(except: CitiesIndex) # resets everything, but CitiesIndex
      #   Chewy::RakeHelper.reset(only: ['cities', 'countries'], except: CitiesIndex) # resets only CountriesIndex
      #
      # @param only [Array<Chewy::Index, String>, Chewy::Index, String] index or indexes to reset; if nothing is passed - uses all the indexes defined in the app
      # @param except [Array<Chewy::Index, String>, Chewy::Index, String] index or indexes to exclude from processing
      # @param parallel [true, Integer, Hash] any acceptable parallel options for import
      # @param output [IO] output io for logging
      # @return [Array<Chewy::Index>] indexes that were reset
      def reset(only: nil, except: nil, parallel: nil, output: $stdout)
        warn_missing_index(output)

        subscribed_task_stats(output) do
          indexes_from(only: only, except: except).each do |index|
            reset_one(index, output, parallel: parallel)
          end
        end
      end

      # Performs zero-downtime reindexing of all documents for the specified
      # indexes only if a particular index specification was changed.
      #
      # @example
      #   Chewy::RakeHelper.upgrade # resets everything
      #   Chewy::RakeHelper.upgrade(only: 'cities') # resets only CitiesIndex
      #   Chewy::RakeHelper.upgrade(only: ['cities', CountriesIndex]) # resets CitiesIndex and CountriesIndex
      #   Chewy::RakeHelper.upgrade(except: CitiesIndex) # resets everything, but CitiesIndex
      #   Chewy::RakeHelper.upgrade(only: ['cities', 'countries'], except: CitiesIndex) # resets only CountriesIndex
      #
      # @param only [Array<Chewy::Index, String>, Chewy::Index, String] index or indexes to reset; if nothing is passed - uses all the indexes defined in the app
      # @param except [Array<Chewy::Index, String>, Chewy::Index, String] index or indexes to exclude from processing
      # @param parallel [true, Integer, Hash] any acceptable parallel options for import
      # @param output [IO] output io for logging
      # @return [Array<Chewy::Index>] indexes that were actually reset
      def upgrade(only: nil, except: nil, parallel: nil, output: $stdout)
        warn_missing_index(output)

        subscribed_task_stats(output) do
          indexes = indexes_from(only: only, except: except)

          changed_indexes = indexes.select do |index|
            index.specification.changed?
          end

          if changed_indexes.present?
            indexes.each do |index|
              if changed_indexes.include?(index)
                reset_one(index, output, parallel: parallel)
              else
                output.puts "Skipping #{index}, the specification didn't change"
              end
            end
          else
            output.puts 'No index specification was changed'
          end

          changed_indexes
        end
      end

      # Performs full update for each passed type if the corresponding index exists.
      #
      # @example
      #   Chewy::RakeHelper.update # updates everything
      #   Chewy::RakeHelper.update(only: 'places') # updates only PlacesIndex
      #   Chewy::RakeHelper.update(except: PlacesIndex) # updates everything, but PlacesIndex
      #
      # @param only [Array<Chewy::Index, String>, Chewy::Index, String] indexes to update; if nothing is passed - uses all the indexes defined in the app
      # @param except [Array<Chewy::Index, String>, Chewy::Index, String] indexes to exclude from processing
      # @param parallel [true, Integer, Hash] any acceptable parallel options for import
      # @param output [IO] output io for logging
      # @return [Array<Chewy::Index>] indexes that were actually updated
      def update(only: nil, except: nil, parallel: nil, output: $stdout)
        subscribed_task_stats(output) do
          indexes_from(only: only, except: except).each_with_object([]) do |index, updated_indexes|
            if index.exists?
              output.puts "Updating #{index}"
              index.import(parallel: parallel)
              updated_indexes.push(index)
            else
              output.puts "Skipping #{index}, it does not exists (use rake chewy:reset[#{index.derivable_name}] to create and update it)"
            end
          end
        end
      end

      # Performs synchronization for each passed index if it exists.
      #
      # @example
      #   Chewy::RakeHelper.sync # synchronizes everything
      #   Chewy::RakeHelper.sync(only: 'places') # synchronizes only PlacesIndex
      #   Chewy::RakeHelper.sync(except: PlacesIndex) # synchronizes everything, but PlacesIndex
      #
      # @param only [Array<Chewy::Index, String>, Chewy::Index, String] indexes to synchronize; if nothing is passed - uses all the indexes defined in the app
      # @param except [Array<Chewy::Index, String>, Chewy::Index, String] indexes to exclude from processing
      # @param parallel [true, Integer, Hash] any acceptable parallel options for sync
      # @param output [IO] output io for logging
      # @return [Array<Chewy::Index>] indexes that were actually updated
      def sync(only: nil, except: nil, parallel: nil, output: $stdout)
        subscribed_task_stats(output) do
          indexes_from(only: only, except: except).each_with_object([]) do |index, synced_indexes|
            output.puts "Synchronizing #{index}"
            output.puts "  #{index} doesn't support outdated synchronization" unless index.supports_outdated_sync?
            time = Time.now
            sync_result = index.sync(parallel: parallel)
            if !sync_result
              output.puts "  Something went wrong with the #{index} synchronization"
            elsif (sync_result[:count]).positive?
              output.puts "  Missing documents: #{sync_result[:missing]}" if sync_result[:missing].present?
              output.puts "  Outdated documents: #{sync_result[:outdated]}" if sync_result[:outdated].present?
              synced_indexes.push(index)
            else
              output.puts "  Skipping #{index}, up to date"
            end
            output.puts "  Took #{human_duration(Time.now - time)}"
          end
        end
      end

      # Applies changes that were done after the specified time for the
      # specified indexes or all of them.
      #
      # @example
      #   Chewy::RakeHelper.journal_apply(time: 1.minute.ago) # applies entries created for the last minute
      #   Chewy::RakeHelper.journal_apply(time: 1.minute.ago, only: 'places') # applies only PlacesIndex entries created for the last minute
      #   Chewy::RakeHelper.journal_apply(time: 1.minute.ago, except: PlacesIndex) # applies everything, but PlacesIndex, entries created for the last minute
      #
      # @param time [Time, DateTime] use only journal entries created after this time
      # @param only [Array<Chewy::Index, String>, Chewy::Index, String] indexes to synchronize; if nothing is passed - uses all the indexes defined in the app
      # @param except [Array<Chewy::Index, String>, Chewy::Index, String] indexes to exclude from processing
      # @param output [IO] output io for logging
      # @return [Array<Chewy::Index>] indexes that were actually updated
      def journal_apply(time: nil, only: nil, except: nil, output: $stdout)
        raise ArgumentError, 'Please specify the time to start with' unless time

        subscribed_task_stats(output) do
          output.puts "Applying journal entries created after #{time}"
          count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time)
          output.puts 'No journal entries were created after the specified time' if count.zero?
        end
      end

      # Removes journal records created before the specified timestamp for
      # the specified indexes or all of them.
      #
      # @example
      #   Chewy::RakeHelper.journal_clean # cleans everything
      #   Chewy::RakeHelper.journal_clean(time: 1.minute.ago) # leaves only entries created for the last minute
      #   Chewy::RakeHelper.journal_clean(only: 'places') # cleans only PlacesIndex entries
      #   Chewy::RakeHelper.journal_clean(except: PlacesIndex) # cleans everything, but PlacesIndex entries
      #
      # @param time [Time, DateTime] clean all the journal entries created before this time
      # @param only [Array<Chewy::Index, String>, Chewy::Index, String] indexes to synchronize; if nothing is passed - uses all the indexes defined in the app
      # @param except [Array<Chewy::Index, String>, Chewy::Index, String] indexes to exclude from processing
      # @param output [IO] output io for logging
      # @return [Array<Chewy::Index>] indexes that were actually updated
      def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout)
        subscribed_task_stats(output) do
          output.puts "Cleaning journal entries created before #{time}" if time
          response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: delete_by_query_options)
          if response.key?('task')
            output.puts "Task to cleanup the journal has been created, #{response['task']}"
          else
            count = response['deleted'] || response['_indices']['_all']['deleted']
            output.puts "Cleaned up #{count} journal entries"
          end
        end
      end

      # Creates journal index.
      #
      # @example
      #   Chewy::RakeHelper.journal_create # creates journal
      #
      # @param output [IO] output io for logging
      # @return Chewy::Index Returns instance of chewy index
      def journal_create(output: $stdout)
        subscribed_task_stats(output) do
          Chewy::Stash::Journal.create!
        end
      end

      # Eager loads and returns all the indexes defined in the application
      # except Chewy::Stash::Specification and Chewy::Stash::Journal.
      #
      # @return [Array<Chewy::Index>] indexes found
      def all_indexes
        Chewy.eager_load!
        Chewy::Index.descendants - [Chewy::Stash::Journal, Chewy::Stash::Specification]
      end

      # Reindex data from source index to destination index
      #
      # @example
      #   Chewy::RakeHelper.reindex(source: 'users_index', dest: 'cities_index') reindex data from 'users_index' index to 'cities_index'
      #
      # @param source [String], dest [String] indexes to reindex
      def reindex(source:, dest:, output: $stdout)
        subscribed_task_stats(output) do
          output.puts "Source index is #{source}\nDestination index is #{dest}"
          Chewy::Index.reindex(source: source, dest: dest)
          output.puts "#{source} index successfully reindexed with #{dest} index data"
        end
      end

      # Adds new fields to an existing data stream or index.
      # Change the search settings of existing fields.
      #
      # @example
      #   Chewy::RakeHelper.update_mapping('cities', {properties: {new_field: {type: :text}}}) update 'cities' index with new_field of text type
      #
      # @param name [String], body_hash [Hash] index name and body hash to update
      def update_mapping(name:, output: $stdout)
        subscribed_task_stats(output) do
          output.puts "Index name is #{name}"
          normalize_index(name).update_mapping
          output.puts "#{name} index successfully updated"
        end
      end

      # Reads options that are required to run journal cleanup asynchronously from ENV hash
      # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
      #
      # @example
      #   Chewy::RakeHelper.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => 'false','REQUESTS_PER_SECOND' => '10','SCROLL_SIZE' => '5000'})
      #   # => { wait_for_completion: false, requests_per_second: 10.0, scroll_size: 5000 }
      #
      def delete_by_query_options_from_env(env)
        env
          .slice(*DELETE_BY_QUERY_OPTIONS)
          .transform_keys { |k| k.downcase.to_sym }
          .to_h do |key, value|
            case key
            when :wait_for_completion then [key, !FALSE_VALUES.include?(value.downcase)]
            when :requests_per_second then [key, value.to_f]
            when :scroll_size then [key, value.to_i]
            end
          end
      end

      def create_missing_indexes!(output: $stdout, env: ENV)
        subscribed_task_stats(output) do
          Chewy.eager_load!
          all_indexes = Chewy::Index.descendants
          all_indexes -= [Chewy::Stash::Journal] unless Chewy.configuration[:journal]
          all_indexes.each do |index|
            if index.exists?
              output.puts "#{index.name} already exists, skipping" if env['VERBOSE']
              next
            end

            index.create!

            output.puts "#{index.name} index successfully created"
          end
        end
      end

      def normalize_indexes(*identifiers)
        identifiers.flatten(1).map { |identifier| normalize_index(identifier) }
      end

      def normalize_index(identifier)
        return identifier if identifier.is_a?(Class) && identifier < Chewy::Index

        "#{identifier.to_s.camelize}Index".constantize
      end

      def subscribed_task_stats(output = $stdout, &block)
        start = Time.now
        ActiveSupport::Notifications.subscribed(JOURNAL_CALLBACK.curry[output], 'apply_journal.chewy') do
          ActiveSupport::Notifications.subscribed(IMPORT_CALLBACK.curry[output], 'import_objects.chewy', &block)
        end
      ensure
        output.puts "Total: #{human_duration(Time.now - start)}"
      end

    private

      def journal_indexes_from(only: nil, except: nil)
        return if Array.wrap(only).empty? && Array.wrap(except).empty?

        indexes_from(only: only, except: except)
      end

      def indexes_from(only: nil, except: nil)
        indexes = if only.present?
          normalize_indexes(Array.wrap(only))
        else
          all_indexes
        end

        indexes -= normalize_indexes(Array.wrap(except)) if except.present?

        indexes.sort_by(&:derivable_name)
      end

      def human_duration(seconds)
        [[60, :s], [60, :m], [24, :h]].map do |amount, unit|
          if seconds.positive?
            seconds, n = seconds.divmod(amount)
            "#{n.to_i}#{unit}"
          end
        end.compact.reverse.join(' ')
      end

      def reset_one(index, output, parallel: false)
        output.puts "Resetting #{index}"
        index.reset!((Time.now.to_f * 1000).round, parallel: parallel, apply_journal: journal_exists?)
      end

      def warn_missing_index(output)
        return if journal_exists?

        output.puts "############################################################\n" \
                    "WARN: You are risking to lose some changes during the reset.\n      " \
                    "Please consider enabling journaling.\n      " \
                    "See https://github.com/toptal/chewy#journaling\n" \
                    '############################################################'
      end

      def journal_exists?
        @journal_exists = Chewy::Stash::Journal.exists? if @journal_exists.nil?

        @journal_exists
      end
    end
  end
end