neoid-gem/neoid

View on GitHub
lib/neoid/batch.rb

Summary

Maintainability
A
25 mins
Test Coverage
module Neoid
  class Batch
    def default_options=(value)
      @default_options = value
    end

    def self.default_options
      @default_options ||= { batch_size: 200, individual_promises: true }
    end

    def self.current_batch
      Thread.current[:neoid_current_batch]
    end

    def self.current_batch=(batch)
      Thread.current[:neoid_current_batch] = batch
    end

    def self.reset_current_batch
      Thread.current[:neoid_current_batch] = nil
    end

    def initialize(options={}, &block)
      if options.respond_to?(:call) && !block
        block = options
        options = {}
      end

      options.reverse_merge!(self.class.default_options)

      @options = options
      @block = block
    end

    def <<(command)
      commands << command

      if commands.length >= @options[:batch_size]
        flush_batch
      end

      if @options[:individual_promises]
        promise = SingleResultPromiseProxy.new(command)
        thens << promise
        promise
      end
    end

    def commands
      @commands ||= []
    end

    def thens
      @thens ||= []
    end

    def count
      @commands ? @commands.count : 0
    end

    def results
      @results ||= []
    end

    def run
      self.class.current_batch = self

      begin
        @block.call(self)
      ensure
        self.class.reset_current_batch
      end

      Neoid.logger.info "Neoid batch (#{commands.length} commands)"

      flush_batch

      BatchPromiseProxy.new(results)
    end

    private

    def flush_batch
      return [] if commands.empty?
      current_results = nil

      benchmark = Benchmark.measure {
        current_results = Neoid.db.batch(*commands)
        current_results.map { |result| result['body'] } if current_results.respond_to?(:map)
      }
      Neoid.logger.info "Neoid batch (#{commands.length} commands) - #{benchmark}"
      commands.clear

      process_results(current_results)

      thens.zip(current_results).each { |t, result| t.perform(result) }

      thens.clear

      results.concat current_results
    end

    def process_results(results)
      results.map! do |result|
        return result unless result.is_a?(Hash) && result['self'] && result['self'][%r[^https?://.*/(node|relationship)/\d+]]

        type = case $1
               when 'node' then Neoid::Node
               when 'relationship' then Neoid::Relationship
               else return result
               end

        type.from_hash(result)
      end
    end
  end

  # returned from a full batch, after it has been executed,
  # so a `.then` can be chained after the batch do ... end
  # it proxies all methods to the result
  class BatchPromiseProxy
    def initialize(results)
      @results = results
    end

    def method_missing(method, *args)
      @results.send(method, *args)
    end

    def then
      yield(@results)
    end
  end

  # returned from adding (<<) an item to a batch in a batch block:
  # Neoid.batch { |batch| (batch << [:neography_command, param]).is_a?(SingleResultPromiseProxy) }
  # so a `.then` can be chained:
  # Neoid.batch { |batch| (batch << [:neography_command, param]).then { |result| puts result } }
  # the `then` is called once the batch is flushed with the result of the single job in the batch
  # it proxies all methods to the result, so in case it is returned (like in Neoid.execute_script_or_add_to_batch)
  # the result of the method will be proxied to the result from the batch. See Node#neo_save
  class SingleResultPromiseProxy
    def initialize(*)
    end

    attr_accessor :result

    def result
      raise 'Accessed result too soon' unless @result
      @result
    end

    def method_missing(method, *args)
      result.send(method, *args)
    end

    def then(&block)
      @then = block
      self
    end

    def perform(result)
      @result = result
      return unless @then
      @then.call(result)
    end
  end
end