breamware/sidekiq-batch

View on GitHub
lib/sidekiq/batch/callback.rb

Summary

Maintainability
A
1 hr
Test Coverage
F
59%
module Sidekiq
  class Batch
    module Callback
      class Worker
        include Sidekiq::Worker

        def perform(clazz, event, opts, bid, parent_bid)
          return unless %w(success complete).include?(event)
          clazz, method = clazz.split("#") if (clazz && clazz.class == String && clazz.include?("#"))
          method = "on_#{event}" if method.nil?
          status = Sidekiq::Batch::Status.new(bid)

          if clazz && object = Object.const_get(clazz)
            instance = object.new
            instance.send(method, status, opts) if instance.respond_to?(method)
          end
        end
      end

      class Finalize
        def dispatch status, opts
          bid = opts["bid"]
          callback_bid = status.bid
          event = opts["event"].to_sym
          callback_batch = bid != callback_bid

          Sidekiq.logger.debug {"Finalize #{event} batch id: #{opts["bid"]}, callback batch id: #{callback_bid} callback_batch #{callback_batch}"}

          batch_status = Status.new bid
          send(event, bid, batch_status, batch_status.parent_bid)


          # Different events are run in different callback batches
          Sidekiq::Batch.cleanup_redis callback_bid if callback_batch
          Sidekiq::Batch.cleanup_redis bid if event == :success
        end

        def success(bid, status, parent_bid)
          return unless parent_bid

          _, _, success, _, complete, pending, children, failure = Sidekiq.redis do |r|
            r.multi do |pipeline|
              pipeline.sadd("BID-#{parent_bid}-success", [bid])
              pipeline.expire("BID-#{parent_bid}-success", Sidekiq::Batch::BID_EXPIRE_TTL)
              pipeline.scard("BID-#{parent_bid}-success")
              pipeline.sadd("BID-#{parent_bid}-complete", [bid])
              pipeline.scard("BID-#{parent_bid}-complete")
              pipeline.hincrby("BID-#{parent_bid}", "pending", 0)
              pipeline.hincrby("BID-#{parent_bid}", "children", 0)
              pipeline.scard("BID-#{parent_bid}-failed")
            end
          end
          # if job finished successfully and parent batch completed call parent complete callback
          # Success callback is called after complete callback
          if complete == children && pending == failure
            Sidekiq.logger.debug {"Finalize parent complete bid: #{parent_bid}"}
            Batch.enqueue_callbacks(:complete, parent_bid)
          end

        end

        def complete(bid, status, parent_bid)
          pending, children, success = Sidekiq.redis do |r|
            r.multi do |pipeline|
              pipeline.hincrby("BID-#{bid}", "pending", 0)
              pipeline.hincrby("BID-#{bid}", "children", 0)
              pipeline.scard("BID-#{bid}-success")
            end
          end

          # if we batch was successful run success callback
          if pending.to_i.zero? && children == success
            Batch.enqueue_callbacks(:success, bid)

          elsif parent_bid
            # if batch was not successfull check and see if its parent is complete
            # if the parent is complete we trigger the complete callback
            # We don't want to run this if the batch was successfull because the success
            # callback may add more jobs to the parent batch

            Sidekiq.logger.debug {"Finalize parent complete bid: #{parent_bid}"}
            _, complete, pending, children, failure = Sidekiq.redis do |r|
              r.multi do |pipeline|
                pipeline.sadd("BID-#{parent_bid}-complete", [bid])
                pipeline.scard("BID-#{parent_bid}-complete")
                pipeline.hincrby("BID-#{parent_bid}", "pending", 0)
                pipeline.hincrby("BID-#{parent_bid}", "children", 0)
                pipeline.scard("BID-#{parent_bid}-failed")
              end
            end
            if complete == children && pending == failure
              Batch.enqueue_callbacks(:complete, parent_bid)
            end
          end
        end

        def cleanup_redis bid, callback_bid=nil
          Sidekiq::Batch.cleanup_redis bid
          Sidekiq::Batch.cleanup_redis callback_bid if callback_bid
        end
      end
    end
  end
end