virtualstaticvoid/taskinator

View on GitHub
lib/taskinator/builder.rb

Summary

Maintainability
A
35 mins
Test Coverage
module Taskinator
  class Builder

    attr_reader :process
    attr_reader :definition
    attr_reader :args
    attr_reader :builder_options

    def initialize(process, definition, *args)
      @process = process
      @definition = definition
      @builder_options = args.last.is_a?(Hash) ? args.pop : {}
      @args = args
      @executor = Taskinator::Executor.new(@definition)
    end

    def option?(key, &block)
      yield if builder_options[key]
    end

    # defines a sub process of tasks which are executed sequentially
    def sequential(options={}, &block)
      raise ArgumentError, 'block' unless block_given?

      sub_process = Process.define_sequential_process_for(@definition, options)
      task = define_sub_process_task(@process, sub_process, options)
      Builder.new(sub_process, @definition, *@args).instance_eval(&block)
      @process.tasks << task if sub_process.tasks.any?
      nil
    end

    # defines a sub process of tasks which are executed concurrently
    def concurrent(complete_on=CompleteOn::Default, options={}, &block)
      raise ArgumentError, 'block' unless block_given?

      sub_process = Process.define_concurrent_process_for(@definition, complete_on, options)
      task = define_sub_process_task(@process, sub_process, options)
      Builder.new(sub_process, @definition, *@args).instance_eval(&block)
      @process.tasks << task if sub_process.tasks.any?
      nil
    end

    # dynamically defines tasks, using the given @iterator method
    # the definition will be evaluated for each yielded item
    def for_each(method, options={}, &block)
      raise ArgumentError, 'method' if method.nil?
      raise NoMethodError, method unless @executor.respond_to?(method)
      raise ArgumentError, 'block' unless block_given?

      #
      # `for_each` is an exception, since it invokes the definition
      # in order to yield elements to the builder, and any options passed
      # are included with the builder options
      #
      method_args = options.any? ? [*@args, options] : @args
      @executor.send(method, *method_args) do |*args|
        Builder.new(@process, @definition, *args).instance_eval(&block)
      end
      nil
    end

    alias_method :transform, :for_each

    # defines a task which executes the given @method
    def task(method, options={})
      raise ArgumentError, 'method' if method.nil?
      raise NoMethodError, method unless @executor.respond_to?(method)

      define_step_task(@process, method, @args, options)
      nil
    end

    # defines a task which executes the given @job
    # which is expected to implement a perform method either as a class or instance method
    def job(job, options={})
      raise ArgumentError, 'job' if job.nil?
      raise ArgumentError, 'job' unless job.methods.include?(:perform) || job.instance_methods.include?(:perform)

      define_job_task(@process, job, @args, options)
      nil
    end

    # TODO: add mailer
    # TODO: add complete!
    # TODO: add fail!

    # defines a sub process task, for the given @definition
    # the definition specified must have input compatible arguments
    # to the current definition
    def sub_process(definition, options={})
      raise ArgumentError, 'definition' if definition.nil?
      raise ArgumentError, "#{definition.name} does not extend the #{Definition.name} module" unless definition.kind_of?(Definition)

      sub_process = definition.create_sub_process(*@args, combine_options(options))
      task = define_sub_process_task(@process, sub_process, options)
      Builder.new(sub_process, definition, *@args)
      @process.tasks << task if sub_process.tasks.any?
      nil
    end

  private

    def define_step_task(process, method, args, options={})
      define_task(process) {
        Task.define_step_task(process, method, args, combine_options(options))
      }
    end

    def define_job_task(process, job, args, options={})
      define_task(process) {
        Task.define_job_task(process, job, args, combine_options(options))
      }
    end

    def define_sub_process_task(process, sub_process, options={})
      Task.define_sub_process_task(process, sub_process, combine_options(options))
    end

    def define_task(process)
      process.tasks << task = yield
      task
    end

    def combine_options(options={})
      builder_options.merge(options)
    end

  end
end