lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb
module Concurrent
module Collection
# @!macro priority_queue
#
# @!visibility private
# @!macro internal_implementation_note
class RubyNonConcurrentPriorityQueue
# @!macro priority_queue_method_initialize
def initialize(opts = {})
order = opts.fetch(:order, :max)
@comparator = [:min, :low].include?(order) ? -1 : 1
clear
end
# @!macro priority_queue_method_clear
def clear
@queue = [nil]
@length = 0
true
end
# @!macro priority_queue_method_delete
def delete(item)
return false if empty?
original_length = @length
k = 1
while k <= @length
if @queue[k] == item
swap(k, @length)
@length -= 1
sink(k) || swim(k)
@queue.pop
else
k += 1
end
end
@length != original_length
end
# @!macro priority_queue_method_empty
def empty?
size == 0
end
# @!macro priority_queue_method_include
def include?(item)
@queue.include?(item)
end
alias_method :has_priority?, :include?
# @!macro priority_queue_method_length
def length
@length
end
alias_method :size, :length
# @!macro priority_queue_method_peek
def peek
empty? ? nil : @queue[1]
end
# @!macro priority_queue_method_pop
def pop
return nil if empty?
max = @queue[1]
swap(1, @length)
@length -= 1
sink(1)
@queue.pop
max
end
alias_method :deq, :pop
alias_method :shift, :pop
# @!macro priority_queue_method_push
def push(item)
raise ArgumentError.new('cannot enqueue nil') if item.nil?
@length += 1
@queue << item
swim(@length)
true
end
alias_method :<<, :push
alias_method :enq, :push
# @!macro priority_queue_method_from_list
def self.from_list(list, opts = {})
queue = new(opts)
list.each{|item| queue << item }
queue
end
private
# Exchange the values at the given indexes within the internal array.
#
# @param [Integer] x the first index to swap
# @param [Integer] y the second index to swap
#
# @!visibility private
def swap(x, y)
temp = @queue[x]
@queue[x] = @queue[y]
@queue[y] = temp
end
# Are the items at the given indexes ordered based on the priority
# order specified at construction?
#
# @param [Integer] x the first index from which to retrieve a comparable value
# @param [Integer] y the second index from which to retrieve a comparable value
#
# @return [Boolean] true if the two elements are in the correct priority order
# else false
#
# @!visibility private
def ordered?(x, y)
(@queue[x] <=> @queue[y]) == @comparator
end
# Percolate down to maintain heap invariant.
#
# @param [Integer] k the index at which to start the percolation
#
# @!visibility private
def sink(k)
success = false
while (j = (2 * k)) <= @length do
j += 1 if j < @length && ! ordered?(j, j+1)
break if ordered?(k, j)
swap(k, j)
success = true
k = j
end
success
end
# Percolate up to maintain heap invariant.
#
# @param [Integer] k the index at which to start the percolation
#
# @!visibility private
def swim(k)
success = false
while k > 1 && ! ordered?(k/2, k) do
swap(k, k/2)
k = k/2
success = true
end
success
end
end
end
end