adhearsion/ruby_ami

View on GitHub
lib/ruby_ami/stream.rb

Summary

Maintainability
A
1 hr
Test Coverage
# encoding: utf-8
module RubyAMI
  class Stream
    class ConnectionStatus
      def name
        self.class.to_s
      end

      def eql?(other)
        other.is_a? self.class
      end

      alias :== :eql?
    end

    Connected = Class.new ConnectionStatus
    Disconnected = Class.new ConnectionStatus

    include Celluloid::IO

    attr_reader :logger

    finalizer :finalize

    def initialize(host, port, username, password, event_callback, logger = Logger, timeout = 0)
      super()
      @host, @port, @username, @password, @event_callback, @logger, @timeout = host, port, username, password, event_callback, logger, timeout
      logger.debug "Starting up..."
      @lexer = Lexer.new self
      @sent_actions   = {}
      @causal_actions = {}
      async.run
    end

    [:started, :stopped, :ready].each do |state|
      define_method("#{state}?") { @state == state }
    end

    def run
      Timeout::timeout(@timeout) do
        @socket = TCPSocket.from_ruby_socket ::TCPSocket.new(@host, @port)
      end
      post_init
      loop { receive_data @socket.readpartial(4096) }
    rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, SocketError => e
      logger.error "Connection failed due to #{e.class}. Check your config and the server."
    rescue EOFError
      logger.info "Client socket closed!"
    rescue Timeout::Error
      logger.error "Timeout exceeded while trying to connect."
    ensure
      async.terminate
    end

    def post_init
      @state = :started
      fire_event Connected.new
      login @username, @password if @username && @password
    end

    def version
      @lexer.ami_version
    end

    def send_data(data)
      @socket.write data
    end

    def send_action(name, headers = {}, error_handler = self.method(:abort))
      condition = Celluloid::Condition.new
      action = dispatch_action name, headers do |response|
        condition.signal response
      end
      condition.wait
      action.response.tap do |resp|
        if resp.is_a? Exception
          error_handler.call(resp)
        end
      end
    end

    def receive_data(data)
      logger.trace "[RECV] #{data}"
      @lexer << data
    end

    def message_received(message)
      logger.trace "[RECV] #{message.inspect}"
      case message
      when Event
        action = causal_action_for_event message
        if action
          action << message
          complete_causal_action_for_event message if action.complete?
        else
          fire_event message
        end
      when Response, Error
        action = sent_action_for_response message
        raise StandardError, "Received an AMI response with an unrecognized ActionID! #{message.inspect}" unless action
        action << message
      end
    end

    def syntax_error_encountered(ignored_chunk)
      logger.error "Encountered a syntax error. Ignoring chunk: #{ignored_chunk.inspect}"
    end

    alias :error_received :message_received

    private

    def login(username, password, event_mask = 'On')
      dispatch_action 'Login',
        'Username' => username,
        'Secret'   => password,
        'Events'   => event_mask
    end

    def dispatch_action(*args, &block)
      action = Action.new *args, &block
      logger.trace "[SEND] #{action.to_s}"
      register_sent_action action
      send_data action.to_s
      action
    end

    def fire_event(event)
      @event_callback.call event, current_actor
    end

    def register_sent_action(action)
      @sent_actions[action.action_id] = action
      register_causal_action action if action.has_causal_events?
    end

    def sent_action_with_id(action_id)
      @sent_actions.delete action_id
    end

    def sent_action_for_response(response)
      sent_action_with_id response.action_id
    end

    def register_causal_action(action)
      @causal_actions[action.action_id] = action
    end

    def causal_action_for_event(event)
      @causal_actions[event.action_id]
    end

    def complete_causal_action_for_event(event)
      @causal_actions.delete event.action_id
    end

    def finalize
      logger.debug "Finalizing stream"
      @socket.close if @socket
      @state = :stopped
      fire_event Disconnected.new
    end
  end
end