liquidm/zmachine

View on GitHub
lib/zmachine/connection.rb

Summary

Maintainability
C
7 hrs
Test Coverage
java_import java.io.IOException
java_import java.nio.ByteBuffer
java_import java.nio.channels.SelectionKey

require 'zmachine'

module ZMachine
  class Connection

    extend Forwardable

    attr_accessor :channel
    attr_reader :timer

    def self.new(*args)
      allocate.instance_eval do
        initialize(*args)
        @args = args
        post_init
        self
      end
    end

    # channel type dispatch

    def bind(address, port_or_type, &block)
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
      klass = channel_class(address)
      @channel = klass.new
      @channel.bind(sanitize_adress(address, klass), port_or_type)
      @block = block
      @block.call(self) if @block && @channel.is_a?(ZMQChannel)
      self
    end

    def connect(address, port_or_type, &block)
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
      klass = channel_class(address)
      @channel = klass.new
      @channel.connect(sanitize_adress(address, klass), port_or_type) if address
      yield self if block_given?
      renew_timer
      self
    end

    def channel_class(address)
      protocol = address.match(/\w+:\/\//)
      if protocol == nil
        return TCPChannel
      elsif protocol[0] == 'msg://'
        return TcpMsgChannel
      end

      ZMQChannel
    end

    def sanitize_adress(address, channel_klass)
      if channel_klass == TcpMsgChannel
        return address.match(/:\/\/(.+)/)[1]
      end

      address
    end

    # callbacks
    def connection_accepted
    end

    def connection_completed
    end

    def post_init
    end

    def receive_data(data)
    end

    def unbind
    end

    # EventMachine Connection API

    def_delegator :@channel, :bound?
    def_delegator :@channel, :can_send?
    def_delegator :@channel, :closed?
    def_delegator :@channel, :connected?
    def_delegator :@channel, :connection_pending?

    def close_connection(after_writing = false)
      ZMachine.close_connection(self, after_writing)
    end

    alias :close :close_connection

    def close_connection_after_writing
      close_connection(true)
    end

    alias :close_after_writing close_connection_after_writing

    def close!
      @timer.cancel if @timer
      @channel.close!
    end

    def comm_inactivity_timeout
      @inactivity_timeout
    end

    def comm_inactivity_timeout=(value)
      @inactivity_timeout = value
    end

    alias :set_comm_inactivity_timeout :comm_inactivity_timeout=

    def get_idle_time
      (System.nano_time - @last_activity) / 1_000_000
    end

    def get_peername
      if peer = @channel.peer
        ::Socket.pack_sockaddr_in(*peer)
      end
    end

    def notify_readable?
      true
    end

    def notify_writable?
      @channel.can_send?
    end

    def pending_connect_timeout=(value)
      @connect_timeout = value
    end

    alias :set_pending_connect_timeout :pending_connect_timeout=

    def reconnect(server, port_or_type)
      ZMachine.reconnect(server, port_or_type, self)
    end

    def send_data(data)
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
      data = data.to_java_bytes if data.is_a?(String) # EM compat
      @channel.send_data(data)
      update_events
    end

    # triggers

    def acceptable!
      client = @channel.accept
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, client: client) if ZMachine.debug
      connection = self.class.new(*@args)
      connection.channel = client
      @block.call(connection) if @block
      connection.connection_accepted
      connection
    end

    def connectable!
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
      @channel.finish_connecting
      @timer.cancel if @timer # cancel pending connect timer
      mark_active!
      connection_completed if @channel.connected?
      update_events
      nil
    end

    def readable!
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
      loop do
        mark_active!
        data = @channel.read_inbound_data
        if data
          receive_data(data)
          break unless @channel.more?
        else
          break
        end
      end
      nil
    end

    def writable!
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
      mark_active!
      @channel.write_outbound_data
      update_events
      nil
    end

    # selector registration

    def register(selector)
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, fd: @channel.selectable_fd) if ZMachine.debug
      @channel_key = @channel.selectable_fd.register(selector, current_events, self)
    end

    def valid?
      @channel_key &&
      @channel_key.valid?
    end

    def update_events
      return unless valid?
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
      @channel_key.interest_ops(current_events)
    end

    def current_events
      if @channel.is_a?(ZMQChannel)
        return SelectionKey::OP_READ
      end

      if bound?
        return SelectionKey::OP_ACCEPT
      end

      if connection_pending?
        return SelectionKey::OP_CONNECT
      end

      events = 0

      events |= SelectionKey::OP_READ if notify_readable?
      events |= SelectionKey::OP_WRITE if notify_writable?

      return events
    end

    def process_events
      return unless valid?
      ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
      if @channel_key.connectable?
        connectable!
      elsif @channel_key.acceptable?
        acceptable!
      else
        writable! if @channel_key.writable?
        readable! if @channel_key.readable?
      end
    rescue Java::JavaNioChannels::CancelledKeyException
      ZMachine.close_connection(self)
    end

    def mark_active!
      @last_activity = System.nano_time
      renew_timer if @inactivity_timeout
    end

    def renew_timer
      @timer.cancel if @timer
      if connection_pending? && @connect_timeout
        @timer = ZMachine.add_timer(@connect_timeout) { ZMachine.close_connection(self, true, Errno::ETIMEDOUT) }
      elsif @inactivity_timeout
        @timer = ZMachine.add_timer(@inactivity_timeout) { ZMachine.close_connection(self, true, Errno::ETIMEDOUT) }
      end
    end

  end
end