twitter/clockworkraven

View on GitHub
lib/threading.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# Copyright 2012 Twitter, Inc. and others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'thread'

# Basic thread pooling utilities. Processes a shared queue of items in a fixed
# number of threads
module Threading
  # Processes <items> in a thread pool of size <size> by calling
  # <processor> and passing the item in as an argument.
  #
  # Blocks until the pool is done processing.
  #
  # Will retry up to <retry_count> times if items fail.
  #
  # If the processor throws an exception on any item, the queue is cleared,
  # and the exception is propagated.
  def Threading.thread_pool items, size=16, retry_count=3, &processor
    queue = Queue.new
    items.each {|o| queue.push o}
    threads = []
    results = []

    size.times do
      threads << Threading.new_thread do
        begin
          until queue.empty?
            item = queue.pop
            result = nil
            retries = retry_count

            begin
              result = processor.call(item)
            rescue Resque::Plugins::Status::Killed => e
              # don't retry if we get forcibly killed
              raise e
            rescue => e
              # for any other error, retry if we have retries left.
              Rails.logger.warn("Got an error in thread pool. Retries: #{retries}.\n#{e.inspect}")
              if retries > 1
                retries -= 1
                retry
              else
                raise e
              end
            end

            results.push result
          end
        rescue => e
          queue.clear
          raise e
        end
      end
    end

    threads.each &:join
    return results
  end

  # Sets the factory used to create new threads.
  #
  # The factory must response to .new(&block) by running &block (possibly in
  # the background) and returning an object that responds to .join. join must
  # not return until the block passed to new completes, and must return the
  # result of the block.
  #
  # Possible factories include Ruby build-in Thread class or
  # Threading::FakeThread
  def self.thread_factory= factory
    @factory = factory
  end

  # Returns the current thread factory. Defaults to Thread.
  def self.thread_factory
    @factory || Thread
  end

  # Creates a new thread using the current thread factory and executes <block>
  # in that thread. Returns an object that responds to :join.
  def self.new_thread &block
    self.thread_factory.new &block
  end

  # A Thread-like class that just stores the block passed to .new and runs
  # it when join is called.
  class FakeThread
    def initialize &block
      @block = block
    end

    def join
      @block.call
    end
  end
end