airbrake/airbrake-ruby

View on GitHub
lib/airbrake-ruby/thread_pool.rb

Summary

Maintainability
A
50 mins
Test Coverage
module Airbrake
  # ThreadPool implements a simple thread pool that can configure the number of
  # worker threads and the size of the queue to process.
  #
  # @example
  #   # Initialize a new thread pool with 5 workers and a queue size of 100. Set
  #   # the block to be run concurrently.
  #   thread_pool = ThreadPool.new(
  #     name: 'performance-notifier',
  #     worker_size: 5,
  #     queue_size: 100,
  #     block: proc { |message| print "ECHO: #{message}..."}
  #   )
  #
  #   # Send work.
  #   10.times { |i| thread_pool << i }
  #   #=> ECHO: 0...ECHO: 1...ECHO: 2...
  #
  # @api private
  # @since v4.6.1
  class ThreadPool
    include Loggable

    # @return [ThreadGroup] the list of workers
    # @note This is exposed for eaiser unit testing
    attr_reader :workers

    def initialize(worker_size:, queue_size:, block:, name: nil)
      @name = name
      @worker_size = worker_size
      @queue_size = queue_size
      @block = block

      @queue = SizedQueue.new(queue_size)
      @workers = ThreadGroup.new
      @mutex = Mutex.new
      @pid = nil
      @closed = false

      has_workers?
    end

    # Adds a new message to the thread pool. Rejects messages if the queue is at
    # its capacity.
    #
    # @param [Object] message The message that gets passed to the block
    # @return [Boolean] true if the message was successfully sent to the pool,
    #   false if the queue is full
    def <<(message)
      if backlog >= @queue_size
        logger.info do
          "#{LOG_LABEL} ThreadPool has reached its capacity of " \
          "#{@queue_size} and the following message will not be " \
          "processed: #{message.inspect}"
        end
        return false
      end

      @queue << message
      true
    end

    # @return [Integer] how big the queue is at the moment
    def backlog
      @queue.size
    end

    # Checks if a thread pool has any workers. A thread pool doesn't have any
    # workers only in two cases: when it was closed or when all workers
    # crashed. An *active* thread pool doesn't have any workers only when
    # something went wrong.
    #
    # Workers are expected to crash when you +fork+ the process the workers are
    # living in. In this case we detect a +fork+ and try to revive them here.
    #
    # Another possible scenario that crashes workers is when you close the
    # instance on +at_exit+, but some other +at_exit+ hook prevents the process
    # from exiting.
    #
    # @return [Boolean] true if an instance wasn't closed, but has no workers
    # @see https://goo.gl/oydz8h Example of at_exit that prevents exit
    def has_workers?
      @mutex.synchronize do
        return false if @closed

        if @pid != Process.pid && @workers.list.empty?
          @pid = Process.pid
          @workers = ThreadGroup.new
          spawn_workers
        end

        !@closed && @workers.list.any?
      end
    end

    # Closes the thread pool making it a no-op (it shut downs all worker
    # threads). Before closing, waits on all unprocessed tasks to be processed.
    #
    # @return [void]
    # @raise [Airbrake::Error] when invoked more than one time
    def close
      threads = @mutex.synchronize do
        raise Airbrake::Error, 'this thread pool is closed already' if @closed

        unless @queue.empty?
          msg = "#{LOG_LABEL} waiting to process #{@queue.size} task(s)..."
          logger.debug("#{msg} (Ctrl-C to abort)")
        end

        @worker_size.times { @queue << :stop }
        @closed = true
        @workers.list.dup
      end

      threads.each(&:join)
      logger.debug("#{LOG_LABEL} #{@name} thread pool closed")
    end

    def closed?
      @closed
    end

    def spawn_workers
      @worker_size.times { @workers.add(spawn_worker) }
    end

    private

    def spawn_worker
      Thread.new do
        while (message = @queue.pop)
          break if message == :stop

          @block.call(message)
        end
      end
    end
  end
end