fredjean/simpler_workflow

View on GitHub
lib/simpler_workflow/activity.rb

Summary

Maintainability
C
1 day
Test Coverage
module SimplerWorkflow
  class Activity
    include OptionsAsMethods

    attr_reader :domain, :name, :version, :options, :next_activity, :task_list

    def initialize(domain, name, version, options = {})
      default_options =
        {
        :default_task_list => name,
        :default_task_start_to_close_timeout => 5 * 60,
        :default_task_schedule_to_start_timeout => 5 * 60,
        :default_task_schedule_to_close_timeout => 10 * 60,
        :default_task_heartbeat_timeout => :none
      }
      @options = default_options.merge(options)
      @domain = domain
      @name = name
      @version = version
      @failure_policy = :fail
      @task_list = name.to_s
    end

    def on_success(activity, version = nil)
      case activity
      when Hash
        name = activity[:name].to_sym
        version = activity[:version]
      when String
        name = activity.to_sym
      when Symbol
        name = activity
      end

      @next_activity = Activity[domain, name, version]
    end

    def on_fail(failure_policy)
      @failure_policy = failure_policy.to_sym
    end

    def failure_policy
      @failure_policy || :fail
    end

    def perform_activity(&block)
      @perform_task = block
    end

    def name
      @name.to_s
    end

    def perform_task(task)
      logger.info("Performing task #{name}")
      @perform_task.call(task)
    rescue => e
      context = {}
      context[:activity_type] = [name.to_s, version]
      context[:input] = task.input
      context[:activity_id] = task.activity_id
      SimplerWorkflow.exception_reporter.report(e, context)
      task.fail! :reason => e.message[0..250], :details => {:failure_policy => failure_policy}.to_json
    end

    def to_activity_type
      domain.activity_types[name, version]
    end

    def persist_attributes
      activities.persist_attributes(self)
    end

    def simple_db_attributes
      attributes = {
        domain: domain.name,
        name: name,
        version: version,
        failure_policy: failure_policy
      }

      if (next_activity)
        attributes[:next_activity_name] = next_activity.name
        attributes[:next_activity_version] = next_activity.version
      end

      attributes
    end

    def simple_db_name
      "#{name}-#{version}"
    end

    def start_activity_loop
      SimplerWorkflow.child_processes << fork do

        $0 = "Activity: #{name} #{version}"

        Signal.trap('QUIT') do
          # Don't log in trap, ruby 2 complains
          # since we need to exit quickly, only delay quit
          # if we are in the middle of a task
          if @in_task
            @time_to_exit = true 
          else
            Process.exit 0
          end
        end

        Signal.trap('INT') do
          # Don't log in trap, ruby 2 complains
          Process.exit!(0)
        end


        if SimplerWorkflow.after_fork
          SimplerWorkflow.after_fork.call
        end

        loop do
          begin
            logger.info("Starting activity_loop for #{name}")
            domain.activity_tasks.poll(task_list) do |task|
              begin
                logger.info("Received task...")
                @in_task = true
                perform_task(task)
                unless task.responded?
                  task.complete!
                end
              rescue => e
                context = {}
                context[:activity_type] = [name.to_s, version]
                context[:input] = task.input
                context[:activity_id] = task.activity_id
                SimplerWorkflow.exception_reporter.report(e, context)
                task.fail! :reason => e.message, :details => { :failure_policy => :fail }.to_json unless task.responded?
              ensure
                @in_task = false
              end
            end
            Process.exit(0) if @time_to_exit
          rescue Timeout::Error
            if @time_to_exit
              Process.exit(0)
            else
              retry
            end
          end
        end
      end
    end

    def poll_for_single_task
      logger.info("Polling for single task for #{name}")
      domain.activity_tasks.poll_for_single_task(name.to_s)
    end

    def count
      domain.activity_tasks.count(name).to_i
    end

    def self.[](*activity_tuple)
      activities[*activity_tuple]
    end

    def self.[]=(*activity_tuple)
      activities.[]=(*activity_tuple)
    end

    def self.register(domain, name, version, activity)
      activities.register(domain, name, version, activity)
    end

    protected

    def activities
      self.class.activities
    end

    def self.activities
      @activities ||= ActivityRegistry.new
    end

    def self.swf
      SimplerWorkflow.swf
    end

    def logger
      $logger || Rails.logger
    end
  end
end