joelhelbling/stepladder

View on GitHub
lib/stepladder/dsl.rb

Summary

Maintainability
A
2 hrs
Test Coverage
module Stepladder
  class WorkerInitializationError < StandardError; end

  module DSL
    def source_worker(argument=nil, &block)
      ensure_correct_arity_for!(argument, block)

      series = series_from(argument)
      callable = setup_callable_for(block, series)

      return Worker.new(&callable) if series.nil?

      Worker.new do
        series.each(&callable)

        while true do
          handoff nil
        end
      end

    end

    def relay_worker(&block)
      ensure_regular_arity(block)

      Worker.new do |value|
        value && block.call(value)
      end
    end

    def side_worker(mode=:normal, &block)
      ensure_regular_arity(block)

      Worker.new do |value|
        value.tap do |v|
          used_value = mode == :hardened ?
            Marshal.load(Marshal.dump(v)) : v

          v && block.call(used_value)
        end
      end
    end

    def filter_worker(argument=nil, &block)
      if (block && argument.respond_to?(:call))
        throw_with 'You cannot supply two callables'
      end
      callable = argument.respond_to?(:call) ? argument : block
      ensure_callable(callable)

      Worker.new do |value, supply|
        while value && !callable.call(value) do
          value = supply.shift
        end
        value
      end
    end

    class BatchContext < OpenStruct
      def batch_complete?(value, collection)
        value.nil? ||
          !! batch_full.call(value, collection)
      end
    end

    def batch_worker(options = {gathering: 1}, &block)
      ensure_regular_arity(block) if block
      batch_full = block ||
        Proc.new { |_, batch| batch.size >= options[:gathering] }

      batch_context = BatchContext.new({ batch_full: batch_full })

      Worker.new(context: batch_context) do |value, supply, context|
        if value
          context.collection = [value]
          until context.batch_complete?(
              context.collection.last,
              context.collection
          )
            context.collection << supply.shift
          end
          context.collection.compact
        end
      end
    end

    def splitter_worker(&block)
      ensure_regular_arity(block)

      Worker.new do |value|
        if value.nil?
          value
        else
          parts = [block.call(value)].flatten
          while parts.size > 1 do
            handoff parts.shift
          end
          parts.shift
        end
      end
    end

    def trailing_worker(trail_length=2)
      trail = []
      Worker.new do |value, supply|
        if value
          trail.unshift value
          if trail.size >= trail_length
            trail.pop
          end
          while trail.size < trail_length
            trail.unshift supply.shift
          end

          trail
        else
          value
        end
      end
    end

    def handoff(something)
      Fiber.yield something
    end

    private

    def throw_with(*msg)
      raise WorkerInitializationError.new([msg].flatten.join(' '))
    end

    def ensure_callable(callable)
      unless callable && callable.respond_to?(:call)
        throw_with 'You must supply a callable'
      end
    end

    def ensure_regular_arity(block)
      if block.arity != 1
        throw_with \
          "Worker must accept exactly one argument (arity == 1)"
      end
    end

    # only valid for #source_worker
    def ensure_correct_arity_for!(argument, block)
      return unless block
      if argument
        ensure_regular_arity(block)
      else
        if block.arity > 0
          throw_with \
            'Source worker cannot accept any arguments (arity == 0)'
        end
      end
    end

    def series_from(series)
      return if series.nil?
      case
      when series.respond_to?(:to_a)
        series.to_a
      when series.respond_to?(:scan)
        series.scan(/./)
      else
        [series]
      end
    end

    def setup_callable_for(block, series)
      return block unless series
      if block
        return Proc.new { |value| handoff block.call(value) }
      else
        return Proc.new { |value| handoff value }
      end
    end

  end
end