Burgestrand/performer

View on GitHub
lib/performer/queue.rb

Summary

Maintainability
A
45 mins
Test Coverage
require "monitor"

class Performer
  # Similar to the stdlib Queue, but with a thread-safe way of closing it down.
  class Queue
    def initialize
      @queue = []
      @queue_mutex = Monitor.new
      @queue_cond = @queue_mutex.new_cond
      @undefined = {}
      @open = true
    end

    # Push an object into the queue, or yield if not possible.
    #
    # @example pushing an item onto the queue
    #   queue.enq(obj) do
    #     raise "Unable to push #{obj} into queue!"
    #   end
    #
    # @yield if obj could not be pushed onto the queue
    # @param obj
    # @return obj
    # @raise [ArgumentError] if no block given
    def enq(obj)
      unless block_given?
        raise ArgumentError, "no block given"
      end

      pushed = false
      @queue_mutex.synchronize do
        pushed = try_push(obj)
        @queue_cond.signal
      end
      yield if not pushed

      obj
    end

    # Retrieve an object from the queue, or block until one is available.
    #
    # The behaviour is as follows:
    # - empty, open: block until queue is either not empty, or open
    # - not empty, open: yield an item off the queue, return true
    # - not empty, not open: yield an item off the queue, return false
    # - empty, not open: return false
    #
    # @example
    #   open = queue.deq do |obj|
    #     # do something with obj
    #   end
    #
    # @yield [obj] an item retrieved from the queue, if available
    # @return [Boolean] true if queue is open, false if open
    # @raise [ArgumentError] if no block given
    def deq
      unless block_given?
        raise ArgumentError, "no block given"
      end

      obj, was_open = @queue_mutex.synchronize do
        @queue_cond.wait_while { empty? and open? }

        obj = if empty?
          undefined
        else
          queue.shift
        end

        [obj, open?]
      end

      yield obj unless undefined.equal?(obj)
      was_open
    end

    # Close the queue, optionally pushing an item onto the queue right before close.
    #
    # @example close and enqueue
    #   queue.close(object) do
    #     raise "Queue is was already closed!"
    #   end
    #
    # @example close without enqueue
    #   queue.close # => no need for block, since no argument
    #
    # @yield if obj could not be pushed onto the queue
    # @param [Object, nil] obj
    # @return [Object, nil] obj
    # @raise [ArgumentError] if obj given, but no block given
    def close(obj = undefined)
      if undefined.equal?(obj)
        @queue_mutex.synchronize do
          @open = false
          @queue_cond.broadcast
        end

        nil
      elsif not block_given?
        raise ArgumentError, "no block given"
      else
        pushed = false
        @queue_mutex.synchronize do
          pushed = try_push(obj)
          @open = false
          @queue_cond.broadcast
        end
        yield if not pushed

        obj
      end
    end

    # @return [Boolean] true if queue is empty
    def empty?
      queue.empty?
    end

    private

    attr_reader :undefined
    attr_reader :queue

    def try_push(obj)
      if open?
        queue.push(obj)
        true
      else
        false
      end
    end

    def open?
      @open
    end
  end
end