ManageIQ/inventory_refresh

View on GitHub
lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb

Summary

Maintainability
F
3 days
Test Coverage
A
93%
require "inventory_refresh/save_collection/saver/base"
require "inventory_refresh/save_collection/saver/partial_upsert_helper"
require "inventory_refresh/save_collection/saver/retention_helper"
require "active_support/core_ext/module/delegation"

module InventoryRefresh::SaveCollection
  module Saver
    class ConcurrentSafeBatch < InventoryRefresh::SaveCollection::Saver::Base
      private

      # Methods for archiving or deleting non existent records
      include InventoryRefresh::SaveCollection::Saver::PartialUpsertHelper
      include InventoryRefresh::SaveCollection::Saver::RetentionHelper

      delegate :association_to_base_class_mapping,
               :association_to_foreign_key_mapping,
               :association_to_foreign_type_mapping,
               :attribute_references,
               :resource_version_column,
               :to => :inventory_collection

      # Attribute accessor to ApplicationRecord object or Hash
      #
      # @param record [Hash, ApplicationRecord] record or hash
      # @param key [String] key pointing to attribute of the record
      # @return [Object] value of the record on the key
      def record_key(record, key)
        send(record_key_method, record, key)
      end

      # Attribute accessor to ApplicationRecord object
      #
      # @param record [ApplicationRecord] record
      # @param key [String] key pointing to attribute of the record
      # @return [Object] value of the record on the key
      def ar_record_key(record, key)
        record.public_send(key)
      end

      # Attribute accessor to Hash object
      #
      # @param record [Hash] hash
      # @param key [String] key pointing to attribute of the record
      # @return [Object] value of the record on the key
      def pure_sql_record_key(record, key)
        record[select_keys_indexes[key]]
      end

      # Returns iterator or relation based on settings
      #
      # @param association [Symbol] An existing association on manager
      # @return [ActiveRecord::Relation, InventoryRefresh::ApplicationRecordIterator] iterator or relation based on settings
      def batch_iterator(association)
        if pure_sql_records_fetching
          # Building fast iterator doing pure SQL query and therefore avoiding redundant creation of AR objects. The
          # iterator responds to find_in_batches, so it acts like the AR relation. For targeted refresh, the association
          # can already be ApplicationRecordIterator, so we will skip that.
          pure_sql_iterator = lambda do |&block|
            primary_key_offset = nil
            loop do
              relation    = association.select(*select_keys)
                                       .reorder("#{primary_key} ASC")
                                       .limit(batch_size)
              # Using rails way of comparing primary key instead of offset
              relation    = relation.where(arel_primary_key.gt(primary_key_offset)) if primary_key_offset
              records     = get_connection.query(relation.to_sql)
              last_record = records.last
              block.call(records)

              break if records.size < batch_size

              primary_key_offset = record_key(last_record, primary_key)
            end
          end

          InventoryRefresh::ApplicationRecordIterator.new(:iterator => pure_sql_iterator)
        else
          # Normal Rails ActiveRecord::Relation where we can call find_in_batches or
          # InventoryRefresh::ApplicationRecordIterator passed from targeted refresh
          association
        end
      end

      # Saves the InventoryCollection
      #
      # @param association [Symbol] An existing association on manager
      def save!(association)
        attributes_index        = {}
        inventory_objects_index = {}
        all_attribute_keys      = Set.new + inventory_collection.batch_extra_attributes

        inventory_collection.each do |inventory_object|
          attributes = inventory_object.attributes_with_keys(inventory_collection, all_attribute_keys, inventory_object)
          index      = build_stringified_reference(attributes, unique_index_keys)

          # Interesting fact: not building attributes_index and using only inventory_objects_index doesn't do much
          # of a difference, since the most objects inside are shared.
          attributes_index[index]        = attributes
          inventory_objects_index[index] = inventory_object
        end

        expand_all_attribute_keys!(all_attribute_keys)

        logger.debug("Processing #{inventory_collection} of size #{inventory_collection.size}...")

        unless inventory_collection.create_only?
          update_or_destroy_records!(batch_iterator(association), inventory_objects_index, attributes_index, all_attribute_keys)
        end

        unless inventory_collection.create_only?
          inventory_collection.custom_reconnect_block&.call(inventory_collection, inventory_objects_index, attributes_index)
        end

        # Records that were not found in the DB but sent for saving, we will be creating these in the DB.
        if inventory_collection.create_allowed?
          on_conflict = inventory_collection.parallel_safe? ? :do_update : nil

          inventory_objects_index.each_slice(batch_size_for_persisting) do |batch|
            create_records!(all_attribute_keys, batch, attributes_index, :on_conflict => on_conflict)
          end

          if inventory_collection.parallel_safe?
            create_or_update_partial_records(all_attribute_keys)
          end
        end

        logger.debug("Marking :last_seen_at of #{inventory_collection} of size #{inventory_collection.size}...")

        mark_last_seen_at(attributes_index)

        # Let the GC clean this up
        inventory_objects_index = nil
        attributes_index = nil

        logger.debug("Processing #{inventory_collection}, "\
                     "created=#{inventory_collection.created_records.count}, "\
                     "updated=#{inventory_collection.updated_records.count}, "\
                     "deleted=#{inventory_collection.deleted_records.count}...Complete")
      rescue => e
        logger.error("Error when saving #{inventory_collection} with #{inventory_collection_details}. Message: #{e.message}")
        raise e
      end

      def expand_all_attribute_keys!(all_attribute_keys)
        %i[created_at updated_at created_on updated_on].each do |col|
          all_attribute_keys << col if supports_column?(col)
        end
        all_attribute_keys << :type if supports_sti?
        all_attribute_keys << :archived_at if supports_column?(:archived_at)
      end

      def mark_last_seen_at(attributes_index)
        return unless supports_column?(:last_seen_at) && inventory_collection.parallel_safe?
        return if attributes_index.blank?

        all_attribute_keys = [:last_seen_at]

        last_seen_at = Time.now.utc
        attributes_index.each_value { |v| v[:last_seen_at] = last_seen_at }

        query = build_partial_update_query(all_attribute_keys, attributes_index.values)

        get_connection.execute(query)
      end

      # Batch updates existing records that are in the DB using attributes_index. And delete the ones that were not
      # present in inventory_objects_index.
      #
      # @param records_batch_iterator [ActiveRecord::Relation, InventoryRefresh::ApplicationRecordIterator] iterator or
      #        relation, both responding to :find_in_batches method
      # @param inventory_objects_index [Hash{String => InventoryRefresh::InventoryObject}] Hash of InventoryObject objects
      # @param attributes_index [Hash{String => Hash}] Hash of data hashes with only keys that are column names of the
      #        models's table
      # @param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row
      def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, attributes_index, all_attribute_keys)
        hashes_for_update   = []
        records_for_destroy = []
        indexed_inventory_objects = {}

        records_batch_iterator.find_in_batches(:batch_size => batch_size) do |batch|
          update_time = time_now

          batch.each do |record|
            primary_key_value = record_key(record, primary_key)

            next unless assert_distinct_relation(primary_key_value)

            index = db_columns_index(record)

            inventory_object = inventory_objects_index.delete(index)
            hash             = attributes_index[index]

            if inventory_object.nil?
              # Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
              # delete it from the DB.
              if inventory_collection.delete_allowed?
                records_for_destroy << record
              end
            else
              # Record was found in the DB and sent for saving, we will be updating the DB.
              inventory_object.id = primary_key_value
              next unless assert_referential_integrity(hash)
              next unless changed?(record, hash, all_attribute_keys)

              if inventory_collection.parallel_safe? &&
                 (supports_remote_data_timestamp?(all_attribute_keys) || supports_remote_data_version?(all_attribute_keys))

                version_attr, max_version_attr = if supports_remote_data_timestamp?(all_attribute_keys)
                                                   [:resource_timestamp, :resource_timestamps_max]
                                                 elsif supports_remote_data_version?(all_attribute_keys)
                                                   [:resource_counter, :resource_counters_max]
                                                 end

                next if skeletonize_or_skip_record(record_key(record, version_attr),
                                                   hash[version_attr],
                                                   record_key(record, max_version_attr),
                                                   inventory_object)
              end

              hash_for_update = if inventory_collection.use_ar_object?
                                  record.assign_attributes(hash.except(:id))
                                  values_for_database!(all_attribute_keys,
                                                       record.attributes.symbolize_keys)
                                elsif serializable_keys?
                                  # TODO(lsmola) hash data with current DB data to allow subset of data being sent,
                                  # otherwise we would nullify the not sent attributes. Test e.g. on disks in cloud
                                  values_for_database!(all_attribute_keys,
                                                       hash)
                                else
                                  # TODO(lsmola) hash data with current DB data to allow subset of data being sent,
                                  # otherwise we would nullify the not sent attributes. Test e.g. on disks in cloud
                                  hash
                                end
              assign_attributes_for_update!(hash_for_update, update_time)

              hash_for_update[:id] = primary_key_value
              indexed_inventory_objects[index] = inventory_object
              hashes_for_update << hash_for_update
            end
          end

          # Update in batches
          if hashes_for_update.size >= batch_size_for_persisting
            update_records!(all_attribute_keys, hashes_for_update, indexed_inventory_objects)

            hashes_for_update = []
            indexed_inventory_objects = {}
          end

          # Destroy in batches
          if records_for_destroy.size >= batch_size_for_persisting
            destroy_records!(records_for_destroy)
            records_for_destroy = []
          end
        end

        # Update the last batch
        update_records!(all_attribute_keys, hashes_for_update, indexed_inventory_objects)
        hashes_for_update = [] # Cleanup so GC can release it sooner

        # Destroy the last batch
        destroy_records!(records_for_destroy)
        records_for_destroy = [] # Cleanup so GC can release it sooner
      end

      def changed?(_record, _hash, _all_attribute_keys)
        return true unless inventory_collection.check_changed?

        # TODO(lsmola) this check needs to be disabled now, because it doesn't work with lazy_find having secondary
        # indexes. Examples: we save a pod before we save a project, that means the project lazy_find won't evaluate,
        # because we load it with secondary index and can't do skeletal precreate. Then when the object is being saved
        # again, the lazy_find is evaluated, but the resource version is not changed, so the row is not saved.
        #
        # To keep this quick .changed? check, we might need to extend this, so the resource_version doesn't save until
        # all lazy_links of the row are evaluated.
        #
        # if supports_resource_version?(all_attribute_keys) && supports_column?(resource_version_column)
        #   record_resource_version = record_key(record, resource_version_column.to_s)
        #
        #   return record_resource_version != hash[resource_version_column]
        # end

        true
      end

      def db_columns_index(record, pure_sql: false)
        # Incoming values are in SQL string form.
        # TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection
        # TODO(lsmola) maybe we can drop the whole pure sql fetching, since everything will be targeted refresh
        # with streaming refresh? Maybe just metrics and events will not be, but those should be upsert only
        # TODO(lsmola) taking ^ in account, we can't drop pure sql, since that is returned by batch insert and
        # update queries
        unique_index_keys_to_s.map do |attribute|
          value = if pure_sql
                    record[attribute]
                  else
                    record_key(record, attribute)
                  end

          format_value(attribute, value)
        end.join("__")
      end

      def format_value(attribute, value)
        if attribute == "timestamp"
          # TODO: can this be covered by @deserializable_keys?
          type = model_class.type_for_attribute(attribute)
          type.cast(value).utc.iso8601.to_s
        elsif (type = deserializable_keys[attribute.to_sym])
          type.deserialize(value).to_s
        else
          value.to_s
        end
      end

      # Batch updates existing records
      #
      # @param hashes [Array<Hash>] data used for building a batch update sql query
      # @param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row
      def update_records!(all_attribute_keys, hashes, indexed_inventory_objects)
        return if hashes.blank?

        unless inventory_collection.parallel_safe?
          # We need to update the stored records before we save it, since hashes are modified
          inventory_collection.store_updated_records(hashes)
        end

        query = build_update_query(all_attribute_keys, hashes)
        result = get_connection.execute(query)

        if inventory_collection.parallel_safe?
          # We will check for timestamp clashes of full row update and we will fallback to skeletal update
          inventory_collection.store_updated_records(result)

          skeletonize_ignored_records!(indexed_inventory_objects, result)
        end

        result
      end

      # Batch inserts records using attributes_index data. With on_conflict option using :do_update, this method
      # does atomic upsert.
      #
      # @param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row
      # @param batch [Array<InventoryRefresh::InventoryObject>] Array of InventoryObject object we will be inserting into
      #        the DB
      # @param attributes_index [Hash{String => Hash}] Hash of data hashes with only keys that are column names of the
      #        models's table
      # @param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values
      #        are :do_update, :do_nothing, nil
      def create_records!(all_attribute_keys, batch, attributes_index, on_conflict: nil)
        indexed_inventory_objects = {}
        hashes                    = []
        create_time               = time_now
        batch.each do |index, inventory_object|
          hash = if inventory_collection.use_ar_object?
                   record = inventory_collection.model_class.new(attributes_index[index])
                   values_for_database!(all_attribute_keys,
                                        record.attributes.symbolize_keys)
                 elsif serializable_keys?
                   values_for_database!(all_attribute_keys,
                                        attributes_index[index])
                 else
                   attributes_index[index]
                 end

          assign_attributes_for_create!(hash, create_time)

          next unless assert_referential_integrity(hash)

          hashes << hash
          # Index on Unique Columns values, so we can easily fill in the :id later
          indexed_inventory_objects[unique_index_columns.map { |x| hash[x] }] = inventory_object
        end

        return if hashes.blank?

        result = get_connection.execute(
          build_insert_query(all_attribute_keys, hashes, :on_conflict => on_conflict, :mode => :full)
        )

        if inventory_collection.parallel_safe?
          # We've done upsert, so records were either created or updated. We can recognize that by checking if
          # created and updated timestamps are the same
          created_attr = "created_on" if inventory_collection.supports_column?(:created_on)
          created_attr ||= "created_at" if inventory_collection.supports_column?(:created_at)
          updated_attr = "updated_on" if inventory_collection.supports_column?(:updated_on)
          updated_attr ||= "updated_at" if inventory_collection.supports_column?(:updated_at)

          if created_attr && updated_attr
            created, updated = result.to_a.partition { |x| x[created_attr] == x[updated_attr] }
            inventory_collection.store_created_records(created)
            inventory_collection.store_updated_records(updated)
          else
            # The record doesn't have both created and updated attrs, so we'll take all as created
            inventory_collection.store_created_records(result)
          end
        else
          # We've done just insert, so all records were created
          inventory_collection.store_created_records(result)
        end

        if inventory_collection.dependees.present?
          # We need to get primary keys of the created objects, but only if there are dependees that would use them
          map_ids_to_inventory_objects(indexed_inventory_objects,
                                       all_attribute_keys,
                                       hashes,
                                       result,
                                       :on_conflict => on_conflict)
        end

        if inventory_collection.parallel_safe?
          skeletonize_ignored_records!(indexed_inventory_objects, result, :all_unique_columns => true)
        end
      end

      # Stores primary_key values of created records into associated InventoryObject objects.
      #
      # @param indexed_inventory_objects [Hash{String => InventoryRefresh::InventoryObject}] inventory objects indexed
      #        by stringified value made from db_columns
      # @param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row
      # @param hashes [Array<Hashes>] Array of hashes that were used for inserting of the data
      # @param result [Array<Hashes>] Array of hashes that are a result of the batch insert query, each result
      #        contains a primary key_value plus all columns that are a part of the unique index
      # @param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values
      #        are :do_update, :do_nothing, nil
      def map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, hashes, result, on_conflict:)
        if on_conflict == :do_nothing
          # TODO(lsmola) is the comment below still accurate? We will update some partial rows, the actual skeletal
          # precreate will still do nothing.
          # For ON CONFLICT DO NOTHING, we need to always fetch the records plus the attribute_references. This path
          # applies only for skeletal precreate.
          inventory_collection.model_class.where(
            build_multi_selection_query(hashes)
          ).select(unique_index_columns + [:id] + attribute_references.to_a).each do |record|
            key              = unique_index_columns.map { |x| record.public_send(x) }
            inventory_object = indexed_inventory_objects[key]

            # Load also attribute_references, so lazy_find with :key pointing to skeletal reference works
            attributes = record.attributes.symbolize_keys
            attribute_references.each do |ref|
              inventory_object[ref] = attributes[ref]

              next unless (foreign_key = association_to_foreign_key_mapping[ref])

              base_class_name       = attributes[association_to_foreign_type_mapping[ref].try(:to_sym)] || association_to_base_class_mapping[ref]
              id                    = attributes[foreign_key.to_sym]
              inventory_object[ref] = InventoryRefresh::ApplicationRecordReference.new(base_class_name, id)
            end

            inventory_object.id = record.id if inventory_object
          end
        elsif !supports_remote_data_timestamp?(all_attribute_keys) || result.count == batch_size_for_persisting
          # We can use the insert query result to fetch all primary_key values, which makes this the most effective
          # path.
          result.each do |inserted_record|
            key = unique_index_columns.map do |x|
              value = inserted_record[x.to_s]
              type = deserializable_keys[x]
              type ? type.deserialize(value) : value
            end
            inventory_object    = indexed_inventory_objects[key]
            inventory_object.id = inserted_record[primary_key] if inventory_object
          end
        else
          # The remote_data_timestamp is adding a WHERE condition to ON CONFLICT UPDATE. As a result, the RETURNING
          # clause is not guaranteed to return all ids of the inserted/updated records in the result. In that case
          # we test if the number of results matches the expected batch size. Then if the counts do not match, the only
          # safe option is to query all the data from the DB, using the unique_indexes. The batch size will also not match
          # for every remainders(a last batch in a stream of batches)
          inventory_collection.model_class.where(
            build_multi_selection_query(hashes)
          ).select(unique_index_columns + [:id]).each do |inserted_record|
            key                 = unique_index_columns.map { |x| inserted_record.public_send(x) }
            inventory_object    = indexed_inventory_objects[key]
            inventory_object.id = inserted_record.id if inventory_object
          end
        end
      end
    end
  end
end