brandonhilkert/sucker_punch

View on GitHub
lib/sucker_punch/queue.rb

Summary

Maintainability
A
45 mins
Test Coverage
require 'forwardable'

module SuckerPunch
  class Queue < Concurrent::Synchronization::LockableObject
    extend Forwardable
    include Concurrent::ExecutorService

    DEFAULT_MAX_QUEUE_SIZE = 0 # Unlimited

    DEFAULT_EXECUTOR_OPTIONS = {
      min_threads:     2,
      max_threads:     2,
      idletime:        60, # 1 minute
      auto_terminate:  false # Let shutdown modes handle thread termination
    }.freeze

    QUEUES = Concurrent::Map.new

    def self.find_or_create(name, num_workers = 2, num_jobs_max = nil)
      pool = QUEUES.fetch_or_store(name) do
        options = DEFAULT_EXECUTOR_OPTIONS
          .merge(
            min_threads: num_workers,
            max_threads: num_workers,
            max_queue: num_jobs_max || DEFAULT_MAX_QUEUE_SIZE
          )
        Concurrent::ThreadPoolExecutor.new(**options)
      end

      new(name, pool)
    end

    def self.all
      queues = Concurrent::Array.new
      QUEUES.each_pair do |name, pool|
        queues.push new(name, pool)
      end
      queues
    end

    def self.clear
      # susceptible to race conditions--only use in testing
      old = all
      QUEUES.clear
      SuckerPunch::Counter::Busy.clear
      SuckerPunch::Counter::Processed.clear
      SuckerPunch::Counter::Failed.clear
      old.each { |queue| queue.kill }
    end

    def self.stats
      queues = {}

      all.each do |queue|
        queues[queue.name] = {
          "workers" => {
            "total" => queue.total_workers,
            "busy" => queue.busy_workers,
            "idle" => queue.idle_workers,
          },
          "jobs" => {
            "processed" => queue.processed_jobs,
            "failed" => queue.failed_jobs,
            "enqueued" => queue.enqueued_jobs,
          }
        }
      end

      queues
    end

    PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5

    def self.shutdown_all
      deadline = Time.now + SuckerPunch.shutdown_timeout

      if SuckerPunch::RUNNING.make_false
        # If a job is enqueued right before the script exits
        # (command line, rake task, etc.), the system needs an
        # interval to allow the enqueue jobs to make it in to the system
        # otherwise the queue will look idle
        sleep PAUSE_TIME

        queues = all

        # Issue shutdown to each queue and let them wrap up their work. This
        # prevents new jobs from being enqueued and lets the pool clean up
        # after itself
        queues.each { |queue| queue.shutdown }

        # return if every queue is empty and workers in every queue are idle
        return if queues.all? { |queue| queue.idle? }

        SuckerPunch.logger.info("Pausing to allow workers to finish...")

        remaining = deadline - Time.now

        # Continue to loop through each queue and test if it's idle, while
        # respecting the shutdown timeout
        while remaining > PAUSE_TIME
          return if queues.all? { |queue| queue.idle? }
          sleep PAUSE_TIME
          remaining = deadline - Time.now
        end

        # Queues haven't finished work. Aggressively kill them.
        SuckerPunch.logger.warn("Queued jobs didn't finish before shutdown_timeout...killing remaining jobs")
        queues.each { |queue| queue.kill }
      end
    end
    
    def self.wait
      queues = all
      
      # return if every queue is empty and workers in every queue are idle
      return if queues.all? { |queue| queue.idle? }

      SuckerPunch.logger.info("Pausing to allow workers to finish...")

      while queues.any? { |queue| !queue.idle? }
        sleep PAUSE_TIME
      end
    end

    attr_reader :name

    def_delegators :@pool,
      :max_length,
      :min_length,
      :max_queue,
      :length,
      :queue_length,
      :wait_for_termination#,
      #:idletime,
      #:max_queue,
      #:largest_length,
      #:scheduled_task_count,
      #:completed_task_count,
      #:can_overflow?,
      #:remaining_capacity,
      #:running?,
      #:shuttingdown?

    alias_method :total_workers, :length
    alias_method :enqueued_jobs, :queue_length

    def initialize(name, pool)
      super()
      @running = true
      @name, @pool = name, pool
    end

    def running?
      synchronize { @running }
    end

    def idle?
      enqueued_jobs == 0 && busy_workers == 0
    end

    def ==(other)
      pool == other.pool
    end

    def busy_workers
      SuckerPunch::Counter::Busy.new(name).value
    end

    def idle_workers
      total_workers - busy_workers
    end

    def processed_jobs
      SuckerPunch::Counter::Processed.new(name).value
    end

    def failed_jobs
      SuckerPunch::Counter::Failed.new(name).value
    end

    def post(*args, &block)
      synchronize do
        if @running
          @pool.post(*args, &block)
        else
          false
        end
      end
    end
    ruby2_keywords(:post) if respond_to?(:ruby2_keywords, true)

    def kill
      @pool.kill
    end

    def shutdown
      synchronize { @running = false }
      @pool.shutdown
    end

    protected

    def pool
      @pool
    end
  end
end