bnorton/micro_q

View on GitHub
lib/micro_q/manager/default.rb

Summary

Maintainability
A
1 hr
Test Coverage
module MicroQ
  module Manager
    ##
    # The default manager implementation.
    # Wrapper for a Queue and a pool of Workers. At each time slice
    # after start! was called, try to dequeue messages from the queue.
    # Perform each message on the worker pool.
    #
    # The pool of workers (more info):
    #   https://github.com/celluloid/celluloid/wiki/Pools
    #
    # The pool manages asynchronously assigning messages to available
    # workers, handles exceptions by restarting the dead actors and
    # is generally a beautiful abstraction on top of a group of linked
    # actors/threads.
    #
    class Default
      include Celluloid

      # Invoke this when the Queue or Worker pool dies
      exit_handler :reinitialize

      attr_reader :queue, :workers, :busy, :current

      def start
        return if MicroQ.queue_only?

        count = workers.size

        if (messages = queue.dequeue(count)).any?
          messages.each do |message|
            work_on(message)
          end
        end

        after(2) { start }
      end

      def work_done(worker)
        message = current.delete(worker)
        queue.finished!(message) if queue.respond_to?(:finished)

        busy.delete(worker)
        workers.push(worker)
      end

      def work_on(message)
        worker = workers.pop
        busy << worker

        current[worker] = message

        worker.perform!(message)
      end

      ##
      # Handle init/death of the Queue or the Worker pool
      # When a worker dies the args are (#<Actor ...>, #<Exception>)
      #
      def reinitialize(*args)
        kill_all and return if self.class.shutdown?

        unless @queue && queue.alive?
          @queue = MicroQ.config.queue.new_link
        end

        @busy ||= []
        @workers ||= []
        @current ||= {}

        if args.any?
          message = current.delete(args.first)
          queue.finished!(message) if queue.respond_to?(:finished)
        end

        build_missing_workers
      end

      alias initialize reinitialize

      # Don't shrink the pool if the config changes
      def build_missing_workers
        return if MicroQ.queue_only?

        workers.select!(&:alive?)
        busy.select!(&:alive?)

        missing_worker_count.times do
          workers << MicroQ.config.worker.new_link(current_actor)
        end
      end

      def missing_worker_count
        [MicroQ.config.workers - (workers.size + busy.size), 0].max
      end

      def kill_all
        (workers + busy).each {|w| w.terminate if w.alive? }
      end

      def self.shutdown?
        !!@shutdown
      end

      def self.shutdown!
        @shutdown = true
      end
    end
  end
end