lib/kiba/streaming_runner.rb

Summary

Maintainability
A
1 hr
Test Coverage
module Kiba
  module StreamingRunner
    extend self

    # allow to handle a block form just like a regular transform
    class AliasingProc < Proc
      alias_method :process, :call
    end

    def run(control)
      run_pre_processes(control)
      process_rows(
        to_instances(control.sources),
        to_instances(control.transforms, true),
        destinations = to_instances(control.destinations)
      )
      close_destinations(destinations)
      run_post_processes(control)
    end

    def run_pre_processes(control)
      to_instances(control.pre_processes, true, false).each(&:call)
    end

    def run_post_processes(control)
      to_instances(control.post_processes, true, false).each(&:call)
    end

    def close_destinations(destinations)
      destinations
        .find_all { |d| d.respond_to?(:close) }
        .each(&:close)
    end

    def transform_stream(stream, t)
      Enumerator.new do |y|
        stream.each do |input_row|
          returned_row = t.process(input_row) do |yielded_row|
            y << yielded_row
          end
          y << returned_row if returned_row
        end
        if t.respond_to?(:close)
          t.close do |close_row|
            y << close_row
          end
        end
      end
    end

    def source_stream(sources)
      Enumerator.new do |y|
        sources.each do |source|
          source.each { |r| y << r }
        end
      end
    end

    def process_rows(sources, transforms, destinations)
      stream = source_stream(sources)
      recurser = lambda { |s, t| transform_stream(s, t) }
      transforms.inject(stream, &recurser).each do |r|
        destinations.each { |d| d.write(r) }
      end
    end

    def to_instances(definitions, allow_block = false, allow_class = true)
      definitions.map do |definition|
        to_instance(
          *definition.values_at(:klass, :args, :block),
          allow_block, allow_class
        )
      end
    end

    def to_instance(klass, args, block, allow_block, allow_class)
      if klass && block
        fail "Class and block form cannot be used together at the moment"
      elsif klass
        fail "Class form is not allowed here" unless allow_class
        klass.new(*args)
      elsif block
        fail "Block form is not allowed here" unless allow_block
        AliasingProc.new(&block)
      else
        fail "Nil parameters not allowed here"
      end
    end
  end
end