adhearsion/adhearsion

View on GitHub
lib/adhearsion/call.rb

Summary

Maintainability
D
2 days
Test Coverage
# encoding: utf-8

require 'has_guarded_handlers'
require 'thread'
require 'active_support/hash_with_indifferent_access'
require 'active_support/core_ext/hash/indifferent_access'
require 'adhearsion'

module Adhearsion
  ##
  # Encapsulates call-related data and behavior.
  #
  class Call

    Hangup          = Class.new Adhearsion::Error
    CommandTimeout  = Class.new Adhearsion::Error
    ExpiredError    = Class.new Celluloid::DeadActorError

    # @private
    class ActorProxy < Celluloid::CellProxy
      def method_missing(meth, *args, &block)
        super(meth, *args, &block)
      rescue ::Celluloid::DeadActorError
        raise ExpiredError, "This call is expired and is no longer accessible. See http://adhearsion.com/docs/calls for further details."
      end

      def active?
        alive? && super
      rescue ExpiredError
        false
      end
    end

    include Celluloid
    include HasGuardedHandlers

    proxy_class Call::ActorProxy

    execute_block_on_receiver :register_handler, :register_tmp_handler, :register_handler_with_priority, :register_handler_with_options, :register_event_handler, :on_joined, :on_unjoined, :on_end, :execute_controller, *execute_block_on_receiver
    finalizer :finalize

    # @return [Symbol] the reason for the call ending
    attr_reader :end_reason

    # @return [String] the reason code for the call ending
    attr_reader :end_code

    # @return [Array<Adhearsion::CallController>] the set of call controllers executing on the call
    attr_reader :controllers

    # @return [Hash<String => String>] a collection of SIP headers set during the call
    attr_reader :variables

    # @return [Time] the time at which the call began. For inbound calls this is the time at which the call was offered to Adhearsion. For outbound calls it is the time at which the remote party answered.
    attr_reader :start_time

    # @return [Time] the time at which the call began. For inbound calls this is the time at which the call was offered to Adhearsion. For outbound calls it is the time at which the remote party answered.
    attr_reader :end_time

    # @return [true, false] whether or not the call should be automatically hung up after executing its controller
    attr_accessor :auto_hangup

    # @return [Integer] the number of seconds after the call is hung up that the controller will remain active
    attr_accessor :after_hangup_lifetime

    delegate :[], :[]=, :to => :variables

    # @return [String] the value of the To header from the signaling protocol
    delegate :to, to: :offer, allow_nil: true

    # @return [String] the value of the From header from the signaling protocol
    delegate :from, to: :offer, allow_nil: true

    def self.uri(transport, id, domain)
      return nil unless id
      s = ""
      s << transport << ":" if transport
      s << id
      s << "@" << domain if domain
      s
    end

    def initialize(offer = nil)
      register_initial_handlers

      @offer        = nil
      @tags         = []
      @commands     = CommandRegistry.new
      @variables    = HashWithIndifferentAccess.new
      @controllers  = []
      @end_reason   = nil
      @end_code     = nil
      @end_blocker  = Celluloid::Condition.new
      @peers        = {}
      @duration     = nil
      @auto_hangup  = true
      @after_hangup_lifetime = nil

      self << offer if offer
    end

    #
    # @return [String, nil] The globally unique ID for the call
    #
    def id
      offer.target_call_id if offer
    end
    alias :to_s :id

    #
    # @return [String, nil] The domain on which the call resides
    #
    def domain
      offer.domain if offer
    end

    #
    # @return [String, nil] The uri at which the call resides
    #
    def uri
      self.class.uri(transport, id, domain)
    end

    #
    # @return [Array] The set of labels with which this call has been tagged.
    #
    def tags
      @tags.clone
    end

    #
    # Tag a call with an arbitrary label
    #
    # @param [String, Symbol] label String or Symbol with which to tag this call
    #
    def tag(label)
      abort ArgumentError.new "Tag must be a String or Symbol" unless [String, Symbol].include?(label.class)
      @tags << label
    end

    #
    # Remove a label
    #
    # @param [String, Symbol] label
    #
    def remove_tag(label)
      @tags.reject! { |tag| tag == label }
    end

    #
    # Establish if the call is tagged with the provided label
    #
    # @param [String, Symbol] label
    #
    def tagged_with?(label)
      @tags.include? label
    end

    #
    # Hash of joined peers
    # @return [Hash<String => Adhearsion::Call>]
    #
    def peers
      @peers.clone
    end

    #
    # Wait for the call to end. Returns immediately if the call has already ended, else blocks until it does so.
    # @param [Integer, nil] timeout a timeout after which to unblock, returning `:timeout`
    # @return [Symbol] the reason for the call ending
    # @raises [Celluloid::ConditionError] in case of a specified timeout expiring
    #
    def wait_for_end(timeout = nil)
      if end_reason
        end_reason
      else
        @end_blocker.wait(timeout)
      end
    rescue Celluloid::ConditionError => e
      abort e
    end

    #
    # Register a handler for events on this call. Note that Adhearsion::Call implements the has-guarded-handlers API, and all of its methods are available. Specifically, all Adhearsion events are available on the `:event` channel.
    #
    # @param [guards] guards take a look at the guards documentation
    #
    # @yield [Object] trigger_object the incoming event
    #
    # @return [String] handler ID for later manipulation
    #
    # @see http://adhearsion.github.io/has-guarded-handlers for more details
    #
    def register_event_handler(*guards, &block)
      register_handler :event, *guards, &block
    end

    def deliver_message(message)
      logger.debug "Receiving message: #{message.inspect}"
      catching_standard_errors do
        trigger_handler :event, message, broadcast: true, exception_callback: ->(e) { Adhearsion::Events.trigger :exception, [e, logger] }
      end
    end
    alias << deliver_message

    def commands
      @commands.clone
    end

    # @private
    def register_initial_handlers
      register_event_handler Adhearsion::Event::Offer do |offer|
        @offer  = offer
        @client = offer.client
        @start_time = offer.timestamp.to_time
      end

      register_event_handler Adhearsion::HasHeaders do |event|
        merge_headers event.headers
      end

      on_joined do |event|
        if event.call_uri
          target = event.call_uri
          type = :call
        else
          target = event.mixer_name
          type = :mixer
        end
        logger.info "Joined to #{type} #{target}"
        call = Adhearsion.active_calls.with_uri(target)
        @peers[target] = call
        signal :joined, target
      end

      on_unjoined do |event|
        if event.call_uri
          target = event.call_uri
          type = :call
        else
          target = event.mixer_name
          type = :mixer
        end
        logger.info "Unjoined from #{type} #{target}"
        @peers.delete target
        signal :unjoined, target
      end

      on_end do |event|
        logger.info "Call #{from} -> #{to} ended due to #{event.reason}#{" (code #{event.platform_code})" if event.platform_code}"
        @end_time = event.timestamp.to_time
        @duration = @end_time - @start_time if @start_time
        clear_from_active_calls
        @end_reason = event.reason
        @end_code = event.platform_code
        @end_blocker.broadcast event.reason
        @commands.terminate
        after(@after_hangup_lifetime || Adhearsion.config.core.after_hangup_lifetime) { terminate }
      end
    end

    # @return [Float] The call duration until the current time, or until the call was disconnected, whichever is earlier
    def duration
      if @duration
        @duration
      elsif @start_time
        Time.now - @start_time
      else
        0.0
      end
    end

    ##
    # Registers a callback for when this call is joined to another call or a mixer
    #
    # @param [Call, String, Hash, nil] target the target to guard on. May be a Call object, a call ID (String, Hash) or a mixer name (Hash)
    # @option target [String] call_uri The call ID to guard on
    # @option target [String] mixer_name The mixer name to guard on
    #
    def on_joined(target = nil, &block)
      register_event_handler Adhearsion::Event::Joined, *guards_for_target(target) do |event|
        block.call event
      end
    end

    ##
    # Registers a callback for when this call is unjoined from another call or a mixer
    #
    # @param [Call, String, Hash, nil] target the target to guard on. May be a Call object, a call ID (String, Hash) or a mixer name (Hash)
    # @option target [String] call_uri The call ID to guard on
    # @option target [String] mixer_name The mixer name to guard on
    #
    def on_unjoined(target = nil, &block)
      register_event_handler Adhearsion::Event::Unjoined, *guards_for_target(target), &block
    end

    # @private
    def guards_for_target(target)
      target ? [target_from_join_options(join_options_with_target(target))] : []
    end

    def on_end(&block)
      register_event_handler Adhearsion::Event::End, &block
    end

    #
    # @return [Boolean] if the call is currently active or not (disconnected)
    #
    def active?
      !end_reason
    end

    def accept(headers = nil)
      @accept_command ||= write_and_await_response Adhearsion::Rayo::Command::Accept.new(:headers => headers)
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    def answer(headers = nil)
      write_and_await_response Adhearsion::Rayo::Command::Answer.new(:headers => headers)
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    def reject(reason = :busy, headers = nil)
      write_and_await_response Adhearsion::Rayo::Command::Reject.new(:reason => reason, :headers => headers)
      Adhearsion::Events.trigger_immediately :call_rejected, call: current_actor, reason: reason
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    #
    # Redirect the call to some other target system.
    #
    # If the redirect is successful, the call will be released from the
    # telephony engine and Adhearsion will lose control of the call.
    #
    # Note that for the common case, this will result in a SIP 302 or
    # SIP REFER, which provides the caller with a new URI to dial. As such,
    # the redirect target cannot be any telephony-engine specific address
    # (such as sofia/gateway, agent/101, or SIP/mypeer); instead it should be a
    # fully-qualified external SIP URI that the caller can independently reach.
    #
    # @param [String] to the target to redirect to, eg a SIP URI
    # @param [Hash, optional] headers a set of headers to send along with the redirect instruction
    def redirect(to, headers = nil)
      write_and_await_response Adhearsion::Rayo::Command::Redirect.new(to: to, headers: headers)
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    def hangup(headers = nil)
      return false unless active?
      logger.info "Hanging up"
      @end_reason = true
      write_and_await_response Adhearsion::Rayo::Command::Hangup.new(:headers => headers)
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    # @private
    def clear_from_active_calls
      Adhearsion.active_calls.remove_inactive_call current_actor
    end

    ##
    # Joins this call to another call or a mixer
    #
    # @param [Call, String, Hash] target the target to join to. May be a Call object, a call ID (String, Hash) or a mixer name (Hash)
    # @option target [String] call_uri The call ID to join to
    # @option target [String] mixer_name The mixer to join to
    # @param [Hash, Optional] options further options to be joined with
    #
    # @return [Hash] where :command is the issued command, :joined_waiter is a #wait responder which is triggered when the join is complete, and :unjoined_waiter is a #wait responder which is triggered when the entities are unjoined
    #
    def join(target, options = {})
      logger.debug "Joining to #{target}"

      joined_condition = CountDownLatch.new(1)
      on_joined target do
        joined_condition.countdown!
      end

      unjoined_condition = CountDownLatch.new(1)
      on_unjoined target do
        unjoined_condition.countdown!
      end

      on_end do
        joined_condition.countdown!
        unjoined_condition.countdown!
      end

      command = Adhearsion::Rayo::Command::Join.new options.merge(join_options_with_target(target))
      write_and_await_response command
      {command: command, joined_condition: joined_condition, unjoined_condition: unjoined_condition}
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    ##
    # Unjoins this call from another call or a mixer
    #
    # @param [Call, String, Hash, nil] target the target to unjoin from. May be a Call object, a call ID (String, Hash), a mixer name (Hash) or missing to unjoin from every existing join (nil)
    # @option target [String] call_uri The call ID to unjoin from
    # @option target [String] mixer_name The mixer to unjoin from
    #
    def unjoin(target = nil)
      logger.info "Unjoining from #{target}"
      command = Adhearsion::Rayo::Command::Unjoin.new join_options_with_target(target)
      write_and_await_response command
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    # @private
    def join_options_with_target(target)
      case target
      when nil
        {}
      when Call
        { :call_uri => target.uri }
      when String
        { :call_uri => self.class.uri(transport, target, domain) }
      when Hash
        abort ArgumentError.new "You cannot specify both a call URI and mixer name" if target.has_key?(:call_uri) && target.has_key?(:mixer_name)
        target
      else
        abort ArgumentError.new "Don't know how to join to #{target.inspect}"
      end
    end

    # @private
    def target_from_join_options(options)
      call_uri = options[:call_uri]
      return {call_uri: call_uri} if call_uri
      {mixer_name: options[:mixer_name]}
    end

    def wait_for_joined(expected_target)
      target = nil
      until target == expected_target do
        target = wait :joined
      end
    end

    def wait_for_unjoined(expected_target)
      target = nil
      until target == expected_target do
        target = wait :unjoined
      end
    end

    def mute
      write_and_await_response Adhearsion::Rayo::Command::Mute.new
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    def unmute
      write_and_await_response Adhearsion::Rayo::Command::Unmute.new
    rescue Adhearsion::ProtocolError => e
      abort e
    end

    # @private
    def write_and_await_response(command, timeout = 60, fatal = false)
      @commands << command
      write_command command

      error_handler = fatal ? ->(error) { raise error } : ->(error) { abort error }

      response = defer { command.response timeout }
      case response
      when Adhearsion::ProtocolError
        if response.name == :item_not_found
          error_handler[Hangup.new(@end_reason)]
        else
          error_handler[response]
        end
      when Exception
        error_handler[response]
      end

      command
    rescue Timeout::Error
      error_handler[CommandTimeout.new(command.to_s)]
    ensure
      @commands.delete command
    end

    # @private
    def write_command(command)
      abort Hangup.new(@end_reason) unless active? || command.is_a?(Adhearsion::Rayo::Command::Hangup)
      merge_headers command.headers if command.respond_to? :headers
      logger.debug "Executing command #{command.inspect}"
      unless command.is_a?(Adhearsion::Rayo::Command::Dial)
        command.target_call_id = id
        command.domain = domain
      end
      client.execute_command command
    end

    ##
    # Sends a message to the caller
    #
    # @param [String] body The message text.
    # @param [Hash, Optional] options The message options.
    # @option options [String] subject The message subject.
    #
    def send_message(body, options = {})
      logger.debug "Sending message: #{body}"
      client.send_message id, domain, body, options
    end

    # @private
    def logger_id
      "#{self.class}: #{id}@#{domain}"
    end
    # @private
    def inspect
      return "..." if Celluloid.detect_recursion
      attrs = [:offer, :end_reason, :commands, :variables, :controllers, :to, :from].map do |attr|
        "#{attr}=#{send(attr).inspect}"
      end
      "#<#{self.class}:#{id}@#{domain} #{attrs.join ', '}>"
    end

    #
    # Execute a call controller asynchronously against this call.
    #
    # To block and wait until the controller completes, call `#join` on the result of this method.
    #
    # @param [Adhearsion::CallController] controller an instance of a controller initialized for this call
    # @param [Proc] a callback to be executed when the controller finishes execution
    #
    # @yield execute the current block as the body of a controller by specifying no controller instance
    #
    # @return [Celluloid::ThreadHandle]
    #
    def execute_controller(controller = nil, completion_callback = nil, &block)
      raise ArgumentError, "Cannot supply a controller and a block at the same time" if controller && block_given?
      controller ||= CallController.new current_actor, &block
      logger.info "Executing controller #{controller.class}"
      controller.bg_exec completion_callback
    end

    # @private
    def register_controller(controller)
      @controllers << controller
    end

    # @private
    def pause_controllers
      controllers.each(&:pause!)
    end

    # @private
    def resume_controllers
      controllers.each(&:resume!)
    end

    private

    def offer
      @offer
    end

    def client
      @client
    end

    def transport
      offer.transport if offer
    end

    def merge_headers(headers)
      headers.each do |name, value|
        variables[name.to_s.downcase.gsub('-', '_')] = value
      end
    end

    def finalize
      ::Logging::Repository.instance.delete logger_id
    end

    # @private
    class CommandRegistry < Array
      def terminate
        hangup = Hangup.new
        each { |command| command.response = hangup if command.requested? }
      end
    end

  end
end