lib/pione/patch/drb-patch.rb

Summary

Maintainability
C
1 day
Test Coverage
module Pione
  module DRbPatch
    #
    # special protocol
    #

    # Return waiter table for the aim that clients enable to wait to receive the
    # reply.
    def self.waiter_table
      @waiter_table ||= Pione::Util::WaiterTable.new
    end

    # ReplyReaderError is raised when reply reader happens something error. See
    # +ReplyReader+ class.
    class ReplyReaderError < RuntimeError
      attr_reader :inner_exception

      def initialize(exception)
        @inner_exception = exception
      end
    end

    class ReplyReader
      def initialize
        @watcher_lock = Mutex.new
        @watchers = Set.new
      end

      def start(protocol)
        @thread ||= Thread.new do
          begin
            # loop for receiving reply and waiting the result
            while true
              # receive a replay
              req_id, succ, result = protocol.recv_reply
              # register it to waiter table
              DRbPatch.waiter_table.push(req_id, [succ, result])
            end
          rescue => e
            @watcher_lock.synchronize do
              # pass the exception to watchers
              @watchers.each do |watcher|
                Log::Debug.communication("connection error happened in receiving reply.")
                Log::Debug.communication(e)
                watcher.raise(ReplyReaderError.new(e)) if watcher.alive?
              end

              # remove dead watchers
              @watchers.delete_if {|watcher| not(watcher.alive?)}
            end
          end
        end
      end

      # Makes reader thread for receiving unordered replies.
      def add_watcher(watcher)
        @watcher_lock.synchronize do
          @watchers << watcher
        end
      end

      # Remove the request reader thread watcher.
      def remove_watcher(watcher)
        @watcher_lock.synchronize do
          @watchers.delete_if {|th| th == watcher}
        end
      end
    end

    # +PioneTCPSocket+ is a reply reader thread extension for standard
    # +DRbTCPSocket+.
    class PioneTCPSocket < DRb::DRbTCPSocket
      def initialize(uri, soc, config={})
        super
        @reply_reader = ReplyReader.new
      end

      # Send the request from client to server.
      def send_request(ref, msg_id, arg, b)
        # set watcher
        @reply_reader.add_watcher(Thread.current)

        # send the request
        req_id = @msg.send_request(stream, ref, msg_id, arg, b)

        # start reply reader
        @reply_reader.start(self)

        # wait the reply by using watier table
        succ, result = Pione::DRbPatch.waiter_table.take(req_id, msg_id, arg)

        # remove watcher
        @reply_reader.remove_watcher(Thread.current)

        return succ, result
      end

      # Send the reply with request id. Note: this overrides original +send_rely+.
      def send_reply(req_id, succ, result)
        @msg.send_reply(req_id, stream, succ, result)
      end

      # Return true if connection socket exists.
      def alive?
        return (@socket and not(@socket.closed?))
      end
    end

    # +PioneDRbMessage+ is a special protocol for +PioneTCPSocket+.
    class PioneDRbMessage < DRb::DRbMessage
      def initialize(*args)
        @send_request_lock = Mutex.new
        @recv_request_lock = Mutex.new
        @send_reply_lock = Mutex.new
        @recv_reply_lock = Mutex.new
        super
      end

      # Send a request to the stream. This is different from original at the
      # point that patched version has request id.
      def send_request(stream, ref, msg_id, arg, b)
        # generate a new request id
        req_id = Util::UUID.generate_int

        # show debug message
        Log::Debug.communication do
          "client sends a request %s#%s (fd: %s, req_id: %s)" % [ref.__drburi, msg_id, stream.to_i, req_id]
        end

        # make a dumped request sequece(request id, ref, msg_id, argc, argv, b)
        data = [
          req_id, ref.__drbref, msg_id.id2name, arg.length, *arg, b
        ].map{|elt| dump(elt)}.join('')

        @send_request_lock.synchronize {stream.write(data)}

        return req_id
      rescue => e
        Log::Debug.communication "following error happened while we send request"
        Log::Debug.communication e
        raise DRb::DRbConnError.new, $!.message, $!.backtrace
      end

      # Receive request from the stream. See +ClientReuqest+.
      def recv_request(stream)
        Log::Debug.communication "server tries to receive a request... (fd: %s)" % stream.to_i

        @recv_request_lock.synchronize do
          # read requst id, object id, method name, and arguments size
          req_id = load(stream)
          ref = load(stream)
          msg_id = load(stream)
          argc = load(stream)

          Log::Debug.communication do
            "server received a request (fd: %s, req_id: %s, ref: %s, msg_id: %s)" % [stream.to_i, req_id, ref.to_s, msg_id]
          end

          # check arguement size
          raise DRb::DRbConnError.new("too many arguments") if @argc_limit < argc

          ro = nil
          available = true

          # refer to object
          begin
            ro = DRb.to_obj(ref)
          rescue RangeError => e
            Log::Debug.system("bad object id \"%s\" is referred (msg_id: %s)" % [ref, msg_id])
            available = false
          end

          # build arguments
          argv = Array.new(argc, nil)
          argc.times {|n| argv[n] = load(stream)}

          # read block
          block = load(stream)

          return req_id, ro, msg_id, argv, block, available
        end
      end

      # Send the reply.
      def send_reply(req_id, stream, succ, result)
        Log::Debug.communication {
          "server sends a reply (fd: %s, req_id: %s, result: %s)" % [stream.to_i, req_id, result]
        }

        # build a reply data
        data = dump(req_id) + dump(succ) + dump(result, !succ)

        @send_reply_lock.synchronize {stream.write(data)}
      rescue
        raise DRb::DRbConnError, $!.message, $!.backtrace
      end

      # Receive a reply(request id, succ, and result) from the stream.
      def recv_reply(stream)
        Log::Debug.communication do
          "client tries to receive a reply... (fd: %s)" % stream.to_i
        end

        @recv_reply_lock.synchronize do
          req_id = load(stream)
          succ = load(stream)
          result = load(stream)

          Log::Debug.communication(
            "client received a reply (fd: %s, req_id: %s)" % [stream.to_i, req_id]
          )

          return req_id, succ, result
        end
      end
    end

    # +PioneDRbConn+ provides connections to +DRb::DRbObject+. This class is
    # different from original +DRbConn+ at the point of connection reuse.
    class PioneDRbConn < DRb::DRbConn
      @cache = {} # connection table
      @retry = {} # retrial counter
      @mutex = Mutex.new # same as original's

      class << self
        attr_reader :cache

        # Clear connection cache table.
        def clear_cache
          @cache.values {|connection| connection.close rescue nil}
          @cache.clear
        end

        # Open a remote URI. This method reuse connection if the URI is cached.
        def open(remote_uri)
          conn = nil

          @mutex.synchronize do
            cache = @cache[remote_uri]

            # get connection
            if not(cache.nil?) and cache.alive?
              conn = cache # use cached connection
            else
              conn = self.new(remote_uri) # create a new connection
              Log::Debug.communication "client created a new connection to %s" % remote_uri.inspect
            end
            @cache[remote_uri] = conn
          end

          succ, result = yield(conn)
          @retry[remote_uri] = 0
          return succ, result
        rescue DRb::DRbConnError, ReplyReaderError, Errno::ECONNREFUSED => e
          Log::Debug.communication "client failed to open a connection to %s." % remote_uri
          @mutex.synchronize do
            if @cache[remote_uri]
              @cache[remote_uri].close
              @cache.delete(remote_uri)
            end
            @retry[remote_uri] ||= 0
            @retry[remote_uri] += 1
          end
          if @retry[remote_uri] < 6
            sleep 0.1
            retry
          else
            raise
          end
        end
      end

      # Close the client-to-server socket.
      def close
        Log::Debug.communication("client closed the socket")
        unless @closed
          @closed = true
          self.class.cache.delete(@uri)
          super
        end
      end

      # Send the message from client to server.
      def send_message(ref, msg_id, arg, block)
        @protocol.send_request(ref, msg_id, arg, block)
      end
    end

    #
    # special server
    #

    # BadRequestError is raised when the object id requested by client is
    # unknonw in server.
    class BadRequestError < StandardError
    end

    # ClientRequest represents client's requests.
    class ClientRequest
      def self.receive(client)
        self.new(*client.recv_request)
      end

      attr_reader :req_id
      attr_reader :obj
      attr_reader :msg_id
      attr_reader :argv
      attr_reader :block
      attr_reader :available

      def initialize(req_id, obj, msg_id, argv, block, available)
        @req_id = req_id
        @obj = obj
        @msg_id = msg_id.intern
        @argv = argv
        @block = block
        @available = available
      end

      def eval
        if @available
          @block ? eval_with_block : eval_without_block
        else
          raise BadRequestError
        end
      end

      private

      # Checks whether it can invoke method.
      def valid?
        return false unless @req_id
        return false unless @msg_id
        return false unless @argv
        return true
      end

      def eval_without_block
        if Proc === @obj && @msg_id == :__drb_yield
          ary = @argv.size == 1 ? @argv : [@argv]
          ary.map(&@obj)[0]
        else
          @obj.__send__(@msg_id, *@argv)
        end
      end

      def block_yield(x)
        if x.size == 1 && x[0].class == Array
          x[0] = DRbArray.new(x[0])
        end
        @block.call(*x)
      end

      def eval_with_block
        @obj.__send__(@msg_id, *@argv) do |*x|
          jump_error = nil
          begin
            block_value = block_yield(x)
          rescue LocalJumpError
            jump_error = $!
          end
          if jump_error
            case jump_error.reason
            when :break
              break(jump_error.exit_value)
            else
              raise jump_error
            end
          end
          block_value
        end
      end
    end

    class RequestInvoker
      def initialize(server, client, request)
        @server = server
        @client = client
        @request = request
        check_insecure_method
      end

      def invoke
        # evaluate request
        succ, result = execute_request

        # send_reply with req_id
        begin
          @client.send_reply(@request.req_id, succ, result)
        rescue => e
          Log::Debug.system("it happened communication failure in sending reply(req_id: %s): %s" % [@request.req_id, e.message])
        end
      end

      private

      # perform without setup_message
      def execute_request
        result = eval_request
        if @request.msg_id == :to_ary && result.class == Array
          result = DRbArray.new(result)
        end
        return true, result
      rescue StandardError, ScriptError => e
        return false, e
      end

      def check_insecure_method
        @server.check_insecure_method(@request.obj, @request.msg_id)
      end

      def eval_request
        $SAFE < @server.safe_level ? safe_eval_request : unsafe_eval_request
      end

      # Execute the request within sandbox.
      def safe_eval_request
        info = Thread.current['DRb']
        Thread.new do
          # import DRb info to the sandbox
          Thread.current['DRb'] = info

          # make sandbox
          $SAFE = @drb_server.safe_level

          # invoke request
          unsafe_request_invoke
        end.value
      end

      # Execute the request.
      def unsafe_eval_request
        @request.eval
      end
    end

    # RequestLooper is a receiver of client request. This is different from
    # standard DRb's +main_loop+ at the point that this method doesn't need to
    # wait finishing evaluation of request and reply.
    class RequestLooper
      def initialize(server)
        @server = server
      end

      def start(client)
        loop {handle_client_request(client)}
      end

      private

      def handle_client_request(client)
        # take request from client
        request = ClientRequest.receive(client)

        # run invoker
        invoker = RequestInvoker.new(@server, client, request)
        @server.invoker_threads.add(Thread.new{invoker.invoke})
      rescue DRb::DRbConnError => e
        Log::Debug.communication("server was disconnected from client because of connection error")
        client.close
        raise StopIteration
      end
    end

    class PioneDRbServer < DRb::DRbServer
      attr_reader :invoker_threads

      def initialize(uri=nil, front=nil, config_or_acl=nil)
        # current performing invokers
        @invoker_threads = ThreadGroup.new

        super
      end

      def main_loop
        if @protocol.uri =~ /^receiver:/
          RequestLooper.start(self, @protocol)
          @thread.kill.join # stop transceiver
        else
          Thread.start(@protocol.accept) do |client|
            # relay socket doesn't need request receiver loop because its aim is
            # to get connection only
            unless @protocol.kind_of?(Pione::Relay::RelaySocket)
              # set DRb info to current thread
              Thread.current['DRb'] = {'client' => client, 'server' => self}

              # add exported uri
              DRb.mutex.synchronize do
                client_uri = client.uri
                @exported_uri << client_uri unless @exported_uri.include?(client_uri)
              end

              # start request loop
              RequestLooper.new(self).start(client)
            end
          end
        end

        def stop_service
          # stop invokers
          @invoker_threads.list.each {|thread| thread.kill.join}

          # stop main loop etc.
          super
        end
      end
    end
  end
end

# @api private
module DRb
  class DRbConnError
    attr_reader :args

    def initialize(*args)
      super
      @args = args
    end
  end

  class DRbObject
    # Creates fake connection for relay.
    def __connect
      DRbConn.open(@uri) {}
    end
  end

  # change default protocol
  module DRbProtocol
    @protocol.delete(DRbTCPSocket)
    add_protocol(Pione::DRbPatch::PioneTCPSocket)
  end

  # replace some classes
  __verbose__ = $VERBOSE
  $VERBOSE = nil
  # patch DRbConn for special protocol
  const_set :DRbConn, Pione::DRbPatch::PioneDRbConn
  # patch DRbMessage for special protocol
  const_set :DRbMessage, Pione::DRbPatch::PioneDRbMessage
  # patch for threaded request invocations
  const_set :DRbServer, Pione::DRbPatch::PioneDRbServer
  $VERBOSE = __verbose__
end