lib/neoid/batch.rb
module Neoid
class Batch
def default_options=(value)
@default_options = value
end
def self.default_options
@default_options ||= { batch_size: 200, individual_promises: true }
end
def self.current_batch
Thread.current[:neoid_current_batch]
end
def self.current_batch=(batch)
Thread.current[:neoid_current_batch] = batch
end
def self.reset_current_batch
Thread.current[:neoid_current_batch] = nil
end
def initialize(options={}, &block)
if options.respond_to?(:call) && !block
block = options
options = {}
end
options.reverse_merge!(self.class.default_options)
@options = options
@block = block
end
def <<(command)
commands << command
if commands.length >= @options[:batch_size]
flush_batch
end
if @options[:individual_promises]
promise = SingleResultPromiseProxy.new(command)
thens << promise
promise
end
end
def commands
@commands ||= []
end
def thens
@thens ||= []
end
def count
@commands ? @commands.count : 0
end
def results
@results ||= []
end
def run
self.class.current_batch = self
begin
@block.call(self)
ensure
self.class.reset_current_batch
end
Neoid.logger.info "Neoid batch (#{commands.length} commands)"
flush_batch
BatchPromiseProxy.new(results)
end
private
def flush_batch
return [] if commands.empty?
current_results = nil
benchmark = Benchmark.measure {
current_results = Neoid.db.batch(*commands)
current_results.map { |result| result['body'] } if current_results.respond_to?(:map)
}
Neoid.logger.info "Neoid batch (#{commands.length} commands) - #{benchmark}"
commands.clear
process_results(current_results)
thens.zip(current_results).each { |t, result| t.perform(result) }
thens.clear
results.concat current_results
end
def process_results(results)
results.map! do |result|
return result unless result.is_a?(Hash) && result['self'] && result['self'][%r[^https?://.*/(node|relationship)/\d+]]
type = case $1
when 'node' then Neoid::Node
when 'relationship' then Neoid::Relationship
else return result
end
type.from_hash(result)
end
end
end
# returned from a full batch, after it has been executed,
# so a `.then` can be chained after the batch do ... end
# it proxies all methods to the result
class BatchPromiseProxy
def initialize(results)
@results = results
end
def method_missing(method, *args)
@results.send(method, *args)
end
def then
yield(@results)
end
end
# returned from adding (<<) an item to a batch in a batch block:
# Neoid.batch { |batch| (batch << [:neography_command, param]).is_a?(SingleResultPromiseProxy) }
# so a `.then` can be chained:
# Neoid.batch { |batch| (batch << [:neography_command, param]).then { |result| puts result } }
# the `then` is called once the batch is flushed with the result of the single job in the batch
# it proxies all methods to the result, so in case it is returned (like in Neoid.execute_script_or_add_to_batch)
# the result of the method will be proxied to the result from the batch. See Node#neo_save
class SingleResultPromiseProxy
def initialize(*)
end
attr_accessor :result
def result
raise 'Accessed result too soon' unless @result
@result
end
def method_missing(method, *args)
result.send(method, *args)
end
def then(&block)
@then = block
self
end
def perform(result)
@result = result
return unless @then
@then.call(result)
end
end
end