adtile/aggregator

View on GitHub
lib/aggregator.rb

Summary

Maintainability
A
1 hr
Test Coverage
require "thread"
require "singleton"
require "logger"

class Aggregator
  include Singleton

  attr_accessor :max_batch_size, :max_wait_time, :logger

  def self.push(data)
    self.instance.push(data)
  end

  def self.max_batch_size=(value)
    self.instance.max_batch_size = value
  end

  def self.max_wait_time=(value)
    self.instance.max_wait_time = value
  end

  def self.logger=(logger)
    self.instance.logger = logger
  end

  def self.drain
    self.instance.drain
  end

  def initialize
    @queue = Queue.new
    @mutex = Mutex.new
    @thread = nil

    at_exit { stop }
  end

  def push(data)
    @queue.push(data)
    start unless running?
  end

  def drain
    if running?
      if ! @queue.empty?
        log :info, "joining thread #{@thread.inspect} (queue length = #{@queue.length})"
        @drain = true
        @thread.join if running?
      end

      log :info, "stopping thread #{@thread.inspect} (queue length = #{@queue.length})"
      @thread = nil
    elsif ! @queue.empty?
      start and drain
    end

    true
  end

  private

  def max_batch_size
    @max_batch_size || 1000
  end

  def max_wait_time
    @max_wait_time || 1
  end

  def process(collection, item)
    raise NoMethodError,
      "#{self.class.name}#process(collection, item) must be implemented"
  end

  def finish(collection)
    raise NoMethodError,
      "#{self.class.name}#finish(collection) must be implemented"
  end

  def running?
    @thread && @thread.alive?
  end

  def logger
    @logger ||= Logger.new(STDOUT)
  end

  def log(level, message)
    logger.send(level, "[#{self.class.name}] #{message}")
  end

  def process_queue
    raise StopIteration if @queue.empty? && @drain

    processed_items = 0
    start_time = Time.now

    while processed_items < max_batch_size && (Time.now - start_time) < max_wait_time
      raise StopIteration if @queue.empty? && @drain
      if @queue.empty?
        sleep 0.1
      else
        collection = process(collection, @queue.pop(true))
        processed_items += 1
      end
    end
  ensure
    finish(collection) if collection
  end

  def start
    @mutex.synchronize do
      return false if running?

      @drain = false

      @thread = Thread.new do
        begin
          log :info, "starting thread #{Thread.current}"

          loop do
            process_queue
          end
        rescue Exception => e
          log :warn, "thread crashed with exception: #{e.inspect}"
        end
      end

      @thread.priority = 2

      @thread
    end
  end

  def stop
    if running?
      drain
    else
      log :info, "thread not running - nothing to stop"
      return false
    end
  end

end