lib/pione/patch/rinda-patch.rb

Summary

Maintainability
F
3 days
Test Coverage
# @api private
module Rinda
  class RedundantTupleError < StandardError
    def initialize(tuple)
      @tuple = tuple
    end

    def message
      "Try to write redundant tuple in tuple space: %s" % @tuple.inspect
    end

    def inspect
      "#<Rinda::RedundantTupleError @tuple=%s>" % @tuple.inspect
    end
    alias :to_s :inspect
  end

  class Tuple
    def ==(other)
      return false unless other.kind_of?(Tuple)
      value == other.value
    end
    alias :eql? :"=="

    def init_with_ary(ary)
      @tuple = Array.new(ary.size)
      # add timestamp
      @tuple.timestamp = ary.timestamp
      @tuple.size.times do |i|
        @tuple[i] = ary[i]
      end
    end
  end

  class TupleEntry
    def ==(other)
      return false unless other.kind_of?(TupleEntry)
      value == other.value
    end
    alias :eql? :"=="
  end

  class WaitTemplateEntry
    attr_reader :place
    attr_reader :thread
    attr_reader :found
    attr_accessor :signaled
    attr_accessor :finished

    # @note
    #   removed monitor from original
    def initialize(place, ary, expires=nil)
      super(ary, expires)
      @place = place
      @found = nil
      @signaled = false
      @finished = false
    end

    # @note
    #   thread version(don't use monitor)
    def wait
      @thread = Thread.current
      Thread.stop
      @thread = nil
    end

    # @note
    #   thread version(don't use monitor)
    def signal
      @signaled = true
      if @thread && @thread.status == "sleep"
        @thread.run
      end
    end

    def inspect
      infos = [
        "@cancel=%s" % @cancel,
        "@expires=%s" % @expires,
        "@tuple=%s" % @tuple.inspect,
        "@renewer=%s" % @renewer.inspect,
        "@found=%s" % @found.inspect,
        "@thread=%s" % @thread.inspect,
        "@signaled=%s" % @signaled,
        "@finished=%s" % @finished
      ]
      "#<%s:%s %s>" % ["Rinda::WaitTemplateEntry", __id__, infos.join(", ")]
    end
    alias :to_s :inspect

    def ==(other)
      return false unless other.kind_of?(WaitTemplateEntry)
      return false unless value == other.value
      return false unless @thread == other.thread
      return false unless @signaled == other.signaled
      return false unless @finished == other.finished
      return true
    end
    alias :eql? :"=="
  end

  class TupleBag
    # TupleBin is original array based class.
    class TupleBin
      def elements
        @bin
      end

      def size
        elements.size
      end

      def delete(tuple)
        if i = @bin.index(tuple)
          @bin.delete_at(i)
        end
      end
    end

    # DomainTupleBin is a domain based TupleBin class.
    # @note
    #   DomainTupleBin should take tuples that have domain.
    class DomainTupleBin < TupleBin
      # Creates a new bin.
      def initialize
        @bin = {}
      end

      # Add the tuple into the tuple space.
      #
      # @param [Array] tuple
      #   the tuple
      # @return [void]
      def add(tuple)
        if dom = domain(tuple)
          @bin[dom] = tuple
        else
          raise RuntimeError
        end
      end

      # Deletes the tuple.
      # @param [Array] tuple
      #   the tuple
      # @return [void]
      def delete(tuple)
        @bin.delete(domain(tuple))
      end

      # Deletes tuples that match the block.
      # @yield [Array]
      #   each tuple
      # @return [void]
      def delete_if
        return @bin unless block_given?
        @bin.delete_if {|key, val| yield(val)}
      end

      def elements
        @bin.values
      end

      # Finds a tuple matched by the template. This method searches by index
      # when the template has the domain, otherwise by liner.
      # @param [TemplateEntry] template
      #   template tuple
      # @yield [Array]
      #   match condition block
      # @return [Array]
      #   a matched tuple
      def find(template, &b)
        if key = domain(template)
          # indexed search
          return @bin[key]
        else
          # liner search
          return @bin.values.find {|val| yield(val)}
        end
      end

      # Finds all tuples matched by the template. This method searches by index
      # when the template has the domain, otherwise by liner.
      # @param [TemplateEntry] template
      #   template tuple
      # @yield [Array]
      #   match condition block
      # @return [Array<Array>]
      #   matched tuples
      def find_all(template, &b)
        return @bin.values unless block_given?
        if key = domain(template)
          # indexed search
          return [@bin[key]]
        else
          # liner search
          return @bin.select{|_, val| yield(val)}.values
        end
      end

      # Returns an iterator of the values.
      # @return [Enumerator]
      #   iterator of the values
      def each(*args)
        @bin.values.each(*args)
      end

      private

      # Returns domain position.
      # @param [Array] tuple
      #   the tuple
      # @return [String]
      #   the domain
      def domain(tuple)
        identifier = tuple.value[0]
        pos = Pione::TupleSpace[identifier].domain_position
        tuple.value[pos]
      end
    end

    # DataTupleBin is a set of domains.
    class DataTupleBin < TupleBin
      def initialize
        @bin = {}
      end

      def elements
        @bin.values.map{|val| val.values}.flatten
      end

      # Adds the tuple.
      # @param [Array] tuple
      #   the tuple
      # @return [void]
      def add(tuple)
        prepare_table(domain(tuple))
        @bin[domain(tuple)][name(tuple)] = tuple
      end

      def delete(tuple)
        prepare_table(domain(tuple))
        @bin[domain(tuple)].delete(name(tuple))
      end

      def delete_if
        if block_given?
          @bin.values.each do |table|
            table.delete_if {|_, val| yield(val)}
          end
        end
        return @bin
      end

      def find(template, &b)
        domain = domain(template)
        name = name(template)
        prepare_table(domain)

        if domain
          @bin[domain].values.each do |tuple|
            return tuple if yield(tuple)
          end
        else
          @bin.values.each do |table|
            table.values.each do |tuple|
              return tuple if yield(tuple)
            end
          end
        end
        return nil
      end

      def find_all(template, &b)
        domain = domain(template)
        name = name(template)
        prepare_table(domain)

        if domain
          if block_given?
            return @bin[domain].values.select {|tuple| yield(tuple)}
          else
            return @bin[domain].values
          end
        else
          if block_given?
            return @bin.values.map{|table| table.values}.flatten.select{|tuple|
              yield(tuple)
            }
          else
            return @bin.values.map{|table| table.values}.flatten
          end
        end
      end

      def each(*args)
        @bin.values.map{|table| table.values}.flatten.each(*args)
      end

      private

      def prepare_table(domain)
        if domain
          @bin[domain] = {} unless @bin[domain]
        end
      end

      # Returns the domain.
      def domain(tuple)
        return tuple.value[1]
      end

      # Returns the name.
      def name(tuple)
        return tuple.value[2]
      end
    end

    # HashTupleBin is a hash based bin class.
    class HashTupleBin
      def initialize
        @bin = {}
      end

      def elements
        @bin.values
      end

      def add(tuple)
        unless @bin[key(tuple)]
          @bin[key(tuple)] = tuple
        else
          raise RedundantErrorTuple.new(tuple)
        end
      end

      def delete(tuple)
        @bin.delete(key(tuple))
      end

      def delete_if
        if block_given?
          @bin.delete_if {|_, val| yield(val)}
        end
        return @bin
      end

      def find(template, &b)
        if key(template) && @bin.has_key?(key(template))
          tuple = @bin[key(template)]
          return tuple if yield(tuple)
        else
          @bin.values.each do |tuple|
            return tuple if yield(tuple)
          end
        end
        return nil
      end

      def find_all(template, &b)
        if key(template) && @bin.has_key?(key(template))
          tuple = @bin[key(template)]
          return tuple if yield(tuple)
        else
          return @bin.values.find_all {|tuple|
            yield(tuple)
          }
        end
      end

      def each(*args)
        @bin.values.each(*args)
      end

      private

      # Returns the key.
      def key(tuple)
        # 0:identifier, 1:key, 2:value
        return tuple.value[1]
      end
    end

    def initialize
      @hash = {}
      @mutex = Mutex.new
      @enum = enum_for(:each_entry)
      @special_bin = {}
    end

    def [](ident)
      @hash[ident]
    end

    # Sets special bin class table by identifier.
    def set_special_bin(special_bin)
      @special_bin = special_bin
    end

    def push(tuple)
      key = bin_key(tuple)
      prepare_table(key)
      @mutex.synchronize {@hash[key].add(tuple)}
    end

    def delete(tuple)
      key = bin_key(tuple)
      bin = @mutex.synchronize {@hash[key]}
      return nil unless bin
      @mutex.synchronize {bin.delete(tuple)}
      @mutex.synchronize {@hash.delete(key) if bin.empty?}
      return tuple
    end

    def prepare_table(key)
      unless @mutex.synchronize {@hash[key]}
        @mutex.synchronize {@hash[key] = bin_class(key).new}
      end
    end

    def bin_class(key)
      return TupleBin unless @special_bin
      return @special_bin.has_key?(key) ? @special_bin[key] : TupleBin
    end

    alias :orig_find :find
    alias :orig_find_all :find_all

    public

    # Returns all tuples in the bag.
    def all_tuples
      @mutex.synchronize{@hash.values}.map{|bin| bin.elements}.flatten
    end

    def find(template)
      key = bin_key(template)
      if @special_bin[key]
        prepare_table(key)
        @mutex.synchronize{@hash[key]}.find(template) do |tuple|
          tuple.alive? && template.match(tuple)
        end
      else
        orig_find(template)
      end
    end

    def find_all(template)
      key = bin_key(template)
      if @special_bin[key]
        prepare_table(key)
        @mutex.synchronize{@hash[key]}.find_all(template) do |tuple|
          tuple.alive? && template.match(tuple)
        end
      else
        orig_find_all(template)
      end
    end

    def find_template(tuple)
      @enum.find do |template|
        template.alive? && template.match(tuple)
      end
    end

    def delete_unless_alive
      deleted = []
      @mutex.synchronize do
        @hash.each do |key, bin|
          bin.delete_if do |tuple|
            if tuple.alive?
              false
            else
              deleted.push(tuple)
              true
            end
          end
        end
      end
      deleted
    end

    def task_size
      @mutex.synchronize{@hash[:task]}.size rescue 0
    end

    def working_size
      @mutex.synchronize{@hash[:working]}.size rescue 0
    end

    def finished_size
      @mutex.synchronize{@hash[:finished]}.size rescue 0
    end

    def data_size
      @mutex.synchronize{@hash[:data]}.size rescue 0
    end

    private

    def each_entry(&blk)
      @mutex.synchronize do
        @hash.each do |k, v|
          v.each(&blk)
        end
      end
    end
  end

  class TupleSpace
    attr_reader :bag
    attr_reader :take_waiter
    attr_reader :read_waiter

    alias :orig_initialize :initialize

    def initialize(*args)
      orig_initialize(*args)
      @bag.set_special_bin(
        :task => TupleBag::DomainTupleBin,
        :finished => TupleBag::DomainTupleBin,
        :working => TupleBag::DomainTupleBin,
        :data => TupleBag::DataTupleBin,
        :lift => TupleBag::HashTupleBin
      )
      @mutex = Mutex.new
    end

    def write(tuple, *args)
      tuple.timestamp = Time.now
      real_write(tuple, *args)
    end

    def move(port, tuple, sec=nil)
      real_move(port, tuple, sec)
    end

    def read(tuple, sec=nil)
      lift_tuple(real_read(tuple, sec))
    end

    def read_all(tuple)
      real_read_all(tuple).map do |res|
        lift_tuple(res)
      end
    end

    def take_all(tuple, sec=nil)
      real_take_all(tuple, sec).map {|res| lift_tuple(res)}
    end
    public :take_all

    # Returns all tuples in the space.
    # @param [Symbol] target
    #   tuple type(:all, :bag, :read_waiter, or :take_waiter)
    # @return [Array]
    #   all tuples
    def all_tuples(target=:bag)
      case target
      when :all
        all_tuples(:bag) + all_tuples(:read_waiter) + all_tuples(:take_waiter)
      when :bag
        @mutex.synchronize{@bag.all_tuples}.map{|tuple| tuple.value}
      when :read_waiter
        @mutex.synchronize{@read_waiter.all_tuples}.map{|tuple| tuple.value}
      when :take_waiter
        @mutex.synchronize{@take_waiter.all_tuples}.map{|tuple| tuple.value}
      end
    end

    # @note
    #   mutex version of +notify+
    def notify(event, tuple, sec=nil)
      template = NotifyTemplateEntry.new(self, event, tuple, sec)
      @mutex.synchronize {@notify_waiter.push(template)}
      template
    end

    def task_size
      @bag.task_size
    end

    def working_size
      @bag.working_size
    end

    def finished_size
      @bag.finished_size
    end

    def data_size
      @bag.data_size
    end

    private

    # @note
    #   mutex version of +write+
    def real_write(tuple, sec=nil)
      entry = create_entry(tuple, sec)
      if entry.expired?
        # why only read_waiter???
        @mutex.synchronize{@read_waiter.find_all_template(entry)}.each do |template|
          template.read(tuple)
        end
        notify_event('write', entry.value)
        notify_event('delete', entry.value)
      else
        # push to bag
        @mutex.synchronize do
          @bag.push(entry)
        end
        # start keeper
        start_keeper if entry.expires
        # send tuple to all matched waiters in read waiter list
        @mutex.synchronize do
          @read_waiter.find_all_template(entry).each do |template|
            template.read(tuple)
          end
        end
        # send tuple to one of matched waiters in take waiter list
        @mutex.synchronize do
          if template = @take_waiter.find_template(entry)
            template.signal
          end
        end
        notify_event('write', entry.value)
      end
      entry
    end

    # @note
    #   mutex version of +move+
    def real_move(port, tuple, sec=nil)
      template = WaitTemplateEntry.new(self, tuple, sec)
      yield(template) if block_given?

      if entry = @mutex.synchronize {@bag.find(template)}
        port.push(entry.value) if port
        @mutex.synchronize {@bag.delete(entry)}
        notify_event('take', entry.value)
        template.finished = true
        return entry.value
      end
      raise RequestExpiredError if template.expired?

      begin
        @mutex.synchronize {@take_waiter.push(template)}
        start_keeper if template.expires
        while true
          raise RequestCanceledError if template.canceled?
          raise RequestExpiredError if template.expired?
          if entry = @mutex.synchronize {@bag.find(template)}
            port.push(entry.value) if port
            @mutex.synchronize {@bag.delete(entry)}
            notify_event('take', entry.value)
            template.finished = true
            @mutex.synchronize do
              @take_waiter.delete(template)
            end
            return entry.value
          end
          Thread.current[:WaitTemplate] = template
          template.wait
          Thread.current[:WaitTemplate] = nil
        end
      ensure
        @mutex.synchronize {@take_waiter.delete(template)}
      end
    end

    # @note
    #   mutex version of +read+
    def real_read(tuple, sec=nil)
      template = WaitTemplateEntry.new(self, tuple, sec)
      yield(template) if block_given?

      entry = @mutex.synchronize {@bag.find(template)}
      return entry.value if entry
      raise RequestExpiredError if template.expired?

      begin
        @mutex.synchronize {@read_waiter.push(template)}
        start_keeper if template.expires
        template.wait
        raise RequestCanceledError if template.canceled?
        raise RequestExpiredError if template.expired?
        return template.found
      ensure
        @mutex.synchronize {@read_waiter.delete(template)}
      end
    end

    # @note
    #   mutex version of +read_all+
    def real_read_all(tuple)
      template = WaitTemplateEntry.new(self, tuple, nil)
      entry = @mutex.synchronize {@bag.find_all(template)}
      entry.collect {|e| e.value}
    end

    # @note
    #   mutex version of +read_all+
    def real_take_all(tuple, sec=nil)
      template = WaitTemplateEntry.new(self, tuple, sec)
      yield(template) if block_given?

      entries = @mutex.synchronize {@bag.find_all(template)}
      unless entries.empty?
        entries.each do |entry|
          # port.push(entry.value) if port
          @mutex.synchronize {@bag.delete(entry)}
        end
        template.finished = true
        return entries.map {|entry| entry.value}
      end
      raise RequestExpiredError if template.expired?

      begin
        @mutex.synchronize {@take_waiter.push(template)}
        start_keeper if template.expires
        while true
          raise RequestCanceledError if template.canceled?
          raise RequestExpiredError if template.expired?
          entries = @mutex.synchronize {@bag.find_all(template)}
          unless entries.empty?
            entries.each do |entry|
              # port.push(entry.value) if port
              @mutex.synchronize {@bag.delete(entry)}
            end
            template.finished = true
            @mutex.synchronize {@take_waiter.delete(template)}
            return entries.map {|entry| entry.value}
          end
          Thread.current[:WaitTemplate] = template
          template.wait
          Thread.current[:WaitTemplate] = nil
        end
      ensure
        @mutex.synchronize {@take_waiter.delete(template)}
      end
    end

    # @note
    #   mutex version of +keep_clean+
    def keep_clean
      @mutex.synchronize{@read_waiter.delete_unless_alive}.each do |e|
        e.signal
      end
      @mutex.synchronize{@take_waiter.delete_unless_alive}.each do |e|
        e.signal
      end
      @mutex.synchronize{@notify_waiter.delete_unless_alive}.each do |e|
        e.notify(['close'])
      end
      @mutex.synchronize{@bag.delete_unless_alive}.each do |e|
        notify_event('delete', e.value)
      end
    end

    # @note
    #   mutex version of +start_keeper+
    def start_keeper
      return if @keeper && @keeper.alive?
      @keeper = Thread.new do
        while true
          sleep(@period)
          break unless need_keeper?
          keep_clean
        end
      end
    end

    # Lift the location of the tuple.
    #
    # @param tuple [BasicTuple]
    #   tuple
    # @return [BasicTuple]
    #   lifted tuple
    def lift_tuple(tuple)
      if Pione::TupleSpace[tuple.first]
        if pos = Pione::TupleSpace[tuple.first].location_position
          if new_location = lift_location(tuple[pos])
            tuple = tuple.clone
            tuple[pos] = new_location
          end
        end
      end
      return tuple
    end

    # Lift the location.
    #
    # @param location [Location::BasicLocation]
    #   location that lift from
    # @param history [Array<Location::BasicLocation>]
    #   history of lifted location
    # @return [Location::BasicLocation or nil]
    #   new location
    def lift_location(location, history=[])
      return nil if history.include?(location)

      template = TemplateEntry.new([:lift, location, nil])
      if lift_tuple = @bag.find(template)
        new_location = lift_tuple[2]
        return lift_location(new_location, history << location) || new_location
      end
      return nil
    end
  end

  class TupleSpaceProxy
    def take_all(tuple, sec=nil)
      @ts.take_all(tuple, sec)
    end
  end
end