toptal/chewy

View on GitHub
lib/chewy/index/syncer.rb

Summary

Maintainability
A
25 mins
Test Coverage
module Chewy
  class Index
    # This class is able to find missing and outdated documents in the ES
    # comparing ids from the data source and the ES index. Also, if `outdated_sync_field`
    # exists in the index definition, it performs comparison of this field
    # values for each source object and corresponding ES document. Usually,
    # this field is `updated_at` and if its value in the source is not equal
    # to the value in the index - this means that this document outdated and
    # should be reindexed.
    #
    # To fetch necessary data from the source it uses adapter method
    # {Chewy::Index::Adapter::Base#import_fields}, in case when the Object
    # adapter is used it makes sense to read corresponding documentation.
    #
    # If `parallel` option is passed to the initializer - it will fetch source and
    # index data in parallel and then perform outdated objects calculation in
    # parallel processes. Also, further import (if required) will be performed
    # in parallel as well.
    #
    # @note
    #   In rails 4.0 time converted to json with the precision of seconds
    #   without milliseconds used, so outdated check is not so precise there.
    #
    #   ATTENTION: synchronization may be slow in case when synchronized tables
    #   are missing compound index on primary key and `outdated_sync_field`.
    #
    # @see Chewy::Index::Actions::ClassMethods#sync
    class Syncer
      DEFAULT_SYNC_BATCH_SIZE = 20_000
      ISO_DATETIME = /\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/
      OUTDATED_IDS_WORKER = lambda do |outdated_sync_field_type, source_data_hash, index, total, index_data|
        ::Process.setproctitle("chewy [#{index}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if index
        index_data.each_with_object([]) do |(id, index_sync_value), result|
          next unless source_data_hash[id]

          outdated = if outdated_sync_field_type == 'date'
            !Chewy::Index::Syncer.dates_equal(typecast_date(source_data_hash[id]), Time.iso8601(index_sync_value))
          else
            source_data_hash[id] != index_sync_value
          end

          result.push(id) if outdated
        end
      end
      SOURCE_OR_INDEX_DATA_WORKER = lambda do |syncer, index, kind|
        ::Process.setproctitle("chewy [#{index}]: sync fetching data (#{kind})")
        result = case kind
        when :source
          syncer.send(:fetch_source_data)
        when :index
          syncer.send(:fetch_index_data)
        end
        {kind => result}
      end

      def self.typecast_date(string)
        if string.is_a?(String) && (match = ISO_DATETIME.match(string))
          microsec = (match[7].to_r * 1_000_000).to_i
          day = "#{match[1]}-#{match[2]}-#{match[3]}"
          time_with_seconds = "#{match[4]}:#{match[5]}:#{match[6]}"
          microseconds = format('%06d', microsec)
          date = "#{day}T#{time_with_seconds}.#{microseconds}+00:00"
          Time.iso8601(date)
        else
          string
        end
      end

      # Compares times with ms precision.
      def self.dates_equal(one, two)
        [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000]
      end

      # @param index [Chewy::Index] chewy index
      # @param parallel [true, Integer, Hash] options for parallel execution or the number of processes
      def initialize(index, parallel: nil)
        @index = index
        @parallel = if !parallel || parallel.is_a?(Hash)
          parallel
        elsif parallel.is_a?(Integer)
          {in_processes: parallel}
        else
          {}
        end
      end

      # Finds all the missing and outdated ids and performs import for them.
      #
      # @return [Integer, nil] the amount of missing and outdated documents reindexed, nil in case of errors
      def perform
        ids = missing_ids | outdated_ids
        return 0 if ids.blank?

        @index.import(ids, parallel: @parallel) && ids.count
      end

      # Finds ids of all the objects that are not indexed yet or deleted
      # from the source already.
      #
      # @return [Array<String>] an array of missing ids from both sides
      def missing_ids
        return [] if source_data.blank?

        @missing_ids ||= begin
          source_data_ids = data_ids(source_data)
          index_data_ids = data_ids(index_data)

          (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids)
        end
      end

      # If index supports outdated sync, it compares the values of the
      # `outdated_sync_field` for each object and document in the source
      # and index and returns the ids of entities which differ.
      #
      # @see Chewy::Index::Mapping::ClassMethods#supports_outdated_sync?
      # @return [Array<String>] an array of outdated ids
      def outdated_ids
        return [] if source_data.blank? || index_data.blank? || !@index.supports_outdated_sync?

        @outdated_ids ||= if @parallel
          parallel_outdated_ids
        else
          linear_outdated_ids
        end
      end

    private

      def source_data
        @source_data ||= source_and_index_data.first
      end

      def index_data
        @index_data ||= source_and_index_data.second
      end

      def source_and_index_data
        @source_and_index_data ||= if @parallel
          ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
          result = ::Parallel.map(%i[source index], @parallel, &SOURCE_OR_INDEX_DATA_WORKER.curry[self, @index])
          ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
          if result.first.keys.first == :source
            [result.first.values.first, result.second.values.first]
          else
            [result.second.values.first, result.first.values.first]
          end
        else
          [fetch_source_data, fetch_index_data]
        end
      end

      def fetch_source_data
        if @index.supports_outdated_sync?
          import_fields_args = {
            fields: [@index.outdated_sync_field],
            batch_size: DEFAULT_SYNC_BATCH_SIZE,
            typecast: false
          }
          @index.adapter.import_fields(import_fields_args).to_a.flatten(1).each do |data|
            data[0] = data[0].to_s
          end
        else
          @index.adapter.import_fields(batch_size: DEFAULT_SYNC_BATCH_SIZE, typecast: false).to_a.flatten(1).map(&:to_s)
        end
      end

      def fetch_index_data
        if @index.supports_outdated_sync?
          @index.pluck(:_id, @index.outdated_sync_field).each do |data|
            data[0] = data[0].to_s
          end
        else
          @index.pluck(:_id).map(&:to_s)
        end
      end

      def data_ids(data)
        return data unless @index.supports_outdated_sync?

        data.map(&:first)
      end

      def linear_outdated_ids
        OUTDATED_IDS_WORKER.call(outdated_sync_field_type, source_data.to_h, nil, nil, index_data)
      end

      def parallel_outdated_ids
        size = processor_count.zero? ? index_data.size : (index_data.size / processor_count.to_f).ceil
        batches = index_data.each_slice(size)

        ::ActiveRecord::Base.connection.close if defined?(::ActiveRecord::Base)
        curried_outdated_ids_worker = OUTDATED_IDS_WORKER.curry[outdated_sync_field_type, source_data.to_h, @index, batches.size]
        result = ::Parallel.map(
          batches,
          @parallel,
          &curried_outdated_ids_worker
        ).flatten(1)
        ::ActiveRecord::Base.connection.reconnect! if defined?(::ActiveRecord::Base)
        result
      end

      def processor_count
        @processor_count ||= @parallel[:in_processes] || @parallel[:in_threads] || ::Parallel.processor_count
      end

      def outdated_sync_field_type
        return @outdated_sync_field_type if instance_variable_defined?(:@outdated_sync_field_type)
        return unless @index.outdated_sync_field

        mappings = @index.client.indices.get_mapping(index: @index.index_name).values.first.fetch('mappings', {})

        @outdated_sync_field_type = mappings
          .fetch('properties', {})
          .fetch(@index.outdated_sync_field.to_s, {})['type']
      rescue Elasticsearch::Transport::Transport::Errors::NotFound
        nil
      end
    end
  end
end