47colborne/dynamini

View on GitHub
lib/dynamini/batch_operations.rb

Summary

Maintainability
A
50 mins
Test Coverage
require 'ostruct'

module Dynamini
  module BatchOperations

    def import(models, options = {})
      # Max batch size is 25, per Dynamo BatchWriteItem docs

      models.each_slice(25) do |batch|
        batch.each do |model|
          model.send(:generate_timestamps!) unless options[:skip_timestamps]
        end
        dynamo_batch_save(batch)
      end
    end

    def batch_find(ids = [])
      return OpenStruct.new(found: [], not_found: []) if ids.length < 1
      objects = []
      key_structure = ids.map { |i| {hash_key => i} }
      key_structure.each_slice(100) do |keys|
        response = dynamo_batch_get(keys)
        response.responses[table_name].each do |item|
          objects << new(item.symbolize_keys, false)
        end
      end
      OpenStruct.new(found: objects, not_found: ids - objects.map(&hash_key))
    end

    def batch_delete(ids)
      deletes = ids.map{ |id| { delete_request: { key: { hash_key => id } } } }

      deletes.each_slice(25) do |slice|
        client.batch_write_item(request_items: { table_name => slice })
      end
    end

    def scan(options = {})
      validate_scan_options(options)
      response = dynamo_scan(options)
      OpenStruct.new(
        items: response.items.map { |i| new(i.symbolize_keys, false) },
        last_evaluated_key: response.last_evaluated_key
      )
    end

    private

    def dynamo_batch_get(key_struct)
      client.batch_get_item(
        request_items: {
          table_name => {keys: key_struct}
        }
      )
    end

    def dynamo_batch_save(model_array)
      put_requests = model_array.map do |model|
        {
          put_request: {
            item: model.attributes.reject { |_k, v| v.blank? }.stringify_keys
          }
        }
      end
      request_options = {
        request_items: {table_name => put_requests}
      }
      client.batch_write_item(request_options)
    end

    def dynamo_scan(options)
      if options[:start_key] && !options[:start_key].is_a?(Hash)
        if options[:index_name]
          attr = secondary_index[options[:index_name]][:hash_key_name].to_s
          start_key = { attr => options[:start_key] }
        else
          start_key = { hash_key.to_s => options[:start_key] }
        end
      else
        start_key = options[:start_key]
      end
      client.scan({
        consistent_read:      options[:consistent_read],
        exclusive_start_key:  start_key,
        index_name:           options[:index_name],
        limit:                options[:limit],
        segment:              options[:segment],
        total_segments:       options[:total_segments],
        table_name:           table_name
      }.select { |_, v| !v.nil? })
    end

    def validate_scan_options(options)
      if options[:total_segments] && !options[:segment]
        raise ArgumentError, 'Must specify segment if specifying total_segments'
      elsif options[:segment] && !options[:total_segments]
        raise ArgumentError, 'Must specify total_segments if specifying segment'
      elsif options[:index_name] && (!self.secondary_index || !self.secondary_index[options[:index_name]])
        raise ArgumentError, "Secondary index of #{options[:index_name]} does not exist"
      end
    end
  end
end