joker1007/rukawa

View on GitHub
lib/rukawa/wrapper/active_job.rb

Summary

Maintainability
A
2 hrs
Test Coverage
require 'rukawa/job'
require 'rukawa/wrapper'
require 'rukawa/remote/status_store'

module Rukawa
  module Wrapper
    module ActiveJob
      def self.[](job_class)
        raise "Please set Rukawa.config.status_store subclass of ActiveSupport::Cache::Store" unless Rukawa.config.status_store
        @wrapper_classes ||= {}
        return @wrapper_classes[job_class] if @wrapper_classes[job_class]

        wrapper = Class.new(Rukawa::Job) do
          set_resource_count 0

          define_singleton_method(:origin_class) do
            job_class
          end

          def initialize(variables: {}, context: Context.new, parent_job_net: nil)
            super
            @job_class = self.class.origin_class
          end

          def run
            @job_class.include(Hooks) unless @job_class.include?(Hooks)
            @job_class.prepend(HooksForFailure) unless @job_class.include?(HooksForFailure)
            job = @job_class.perform_later

            status_store = Rukawa::Remote::StatusStore.new(job_id: job.job_id)
            finish_statuses = [Rukawa::Remote::StatusStore::COMPLETED, Rukawa::Remote::StatusStore::FAILED]
            until finish_statuses.include?(last_status = status_store.fetch)
              sleep 0.1
            end

            status_store.delete

            raise WrappedJobError if last_status == Rukawa::Remote::StatusStore::FAILED
          end
        end

        @wrapper_classes[job_class] = wrapper
        Rukawa::Wrapper.const_set("#{job_class.to_s.gsub(/::/, "_")}Wrapper", wrapper)
        wrapper
      end
    end

    module Hooks
      def self.included(base)
        base.class_eval do
          before_enqueue { status_store.enqueued }

          before_perform { status_store.performing }

          after_perform { status_store.completed }
        end
      end

      private

      def status_store
        @status_store ||= Rukawa::Remote::StatusStore.new(job_id: job_id)
      end
    end

    module HooksForFailure
      def perform(*args)
        super
      rescue Exception
        status_store.failed
        raise
      end
    end
  end
end