lib/diameter/stack.rb
require 'uri'
require 'socket'
require 'diameter/peer'
require 'diameter/message'
require 'diameter/stack_transport_helpers'
require 'diameter/diameter_logger'
require 'concurrent'
require 'dnsruby'
module Diameter
class Stack
include Internals
# @!group Setup methods
# Stack constructor.
#
# @note The stack does not advertise any applications to peers by
# default - {#add_handler} must be called early on.
#
# @param host [String] The Diameter Identity of this stack (for
# the Origin-Host AVP).
# @param realm [String] The Diameter realm of this stack (for
# the Origin-Realm AVP).
# @option opts [Fixnum] timeout (60)
# The number of seconds to wait for an answer before notifying
# the caller of a timeout and forgetting about the request.
def initialize(host, realm, opts={})
@local_host = host
@local_realm = realm
@auth_apps = []
@acct_apps = []
@pending_ete = {}
@tcp_helper = TCPStackHelper.new(self)
@peer_table = {}
@handlers = {}
@answer_timeout = opts.fetch(:timeout, 60)
@threadpool = Concurrent::ThreadPoolExecutor.new(
min_threads: 5,
max_threads: 5,
max_queue: 1,
fallback_policy: :caller_runs
)
@res = Dnsruby::Resolver.new
Diameter.logger.log(Logger::INFO, 'Stack initialized')
end
# Complete the stack initialization and begin reading from the TCP connections.
def start
@tcp_helper.start_main_loop
end
# Begins listening for inbound Diameter connections (making this a
# Diameter server instead of just a client).
#
# @param port [Fixnum] The TCP port to listen on (default 3868)
def listen_for_tcp(port=3868)
@tcp_helper.setup_new_listen_connection("0.0.0.0", port)
end
# Adds a handler for a specific Diameter application.
#
# @note If you expect to only send requests for this application,
# not receive them, the block can be a no-op (e.g. `{ nil }`)
#
# @param app_id [Fixnum] The Diameter application ID.
# @option opts [true, false] auth
# Whether we should advertise support for this application in
# the Auth-Application-ID AVP. Note that at least one of auth or
# acct must be specified.
# @option opts [true, false] acct
# Whether we should advertise support for this application in
# the Acct-Application-ID AVP. Note that at least one of auth or
# acct must be specified.
# @option opts [Fixnum] vendor
# If we should advertise support for this application in a
# Vendor-Specific-Application-Id AVP, this specifies the
# associated Vendor-Id.
#
# @yield [req, cxn] Passes a Diameter message (and its originating
# connection) for application-specific handling.
# @yieldparam [Message] req The parsed Diameter message from the peer.
# @yieldparam [Socket] cxn The TCP connection to the peer, to be
# passed to {Stack#send_answer}.
def add_handler(app_id, opts={}, &blk)
vendor = opts.fetch(:vendor, 0)
auth = opts.fetch(:auth, false)
acct = opts.fetch(:acct, false)
raise ArgumentError.new("Must specify at least one of auth or acct") unless auth or acct
@acct_apps << [app_id, vendor] if acct
@auth_apps << [app_id, vendor] if auth
@handlers[app_id] = blk
end
# @!endgroup
# This shuts the stack down, closing all TCP connections and
# terminating any background threads still waiting for an answer.
def shutdown
@tcp_helper.shutdown
@pending_ete.each do |ete, q|
Diameter.logger.debug("Shutting down queue #{q} as no answer has been received with EtE #{ete}")
q.push :shutdown
end
@threadpool.kill
@threadpool.wait_for_termination(5)
end
# Closes the given connection, blanking out any internal data
# structures associated with it.
#
# Likely to be moved to the Peer object in a future release/
#
# @param connection [Socket] The connection to close.
def close(connection)
@tcp_helper.close(connection)
end
# @!group Peer connections and message sending
# Looks up the given Diameter realm with DNS-SRV, and establishes
# a connection to one peer in that realm.
#
# @param realm [String] The Diameter realm to connect to.
# @return [Peer] The Diameter peer chosen.
def connect_to_realm(realm)
possible_peers = []
@res.query("_diameter._tcp.#{realm}", "SRV").each_answer do |a|
possible_peers << {name: a.target.to_s, port: a.port, priority: a.priority, weight: a.weight}
end
# Prefer the lowest priority and the highest weight
possible_peers.sort!{ |a, b| (a[:priority] <=> b[:priority]) || (b[:weight] <=> a[:weight])}
Diameter.logger.debug("Sorted list of peers for realm #{realm} is #{possible_peers.inspect}")
primary = possible_peers[0]
url = "aaa://#{primary[:name]}:#{primary[:port]}"
Diameter.logger.info("Primary peer for realm #{realm} is #{primary[:name]}, (#{url})")
connect_to_peer(url, primary[:name], realm)
end
# Creates a Peer connection to a Diameter agent at the specific
# network location indicated by peer_uri.
#
# @param peer_uri [URI] The aaa:// URI identifying the peer. Should
# contain a hostname/IP; may contain a port (default 3868).
# @param peer_host [String] The DiameterIdentity of this peer, which
# will uniquely identify it in the peer table.
# @param realm [String] The Diameter realm of this peer.
# @return [Peer] The Diameter peer chosen.
def connect_to_peer(peer_uri, peer_host, realm)
peer = Peer.new(peer_host, realm)
@peer_table[peer_host] = peer
@peer_table[peer_host].state = :WAITING
# Will move to :UP when the CEA is received
uri = URI(peer_uri)
cxn = @tcp_helper.setup_new_connection(uri.host, uri.port)
@peer_table[peer_host].cxn = cxn
avps = [AVP.create('Origin-Host', @local_host),
AVP.create('Origin-Realm', @local_realm),
AVP.create('Host-IP-Address', IPAddr.new('127.0.0.1')),
AVP.create('Vendor-Id', 100),
AVP.create('Product-Name', 'ruby-diameter')
]
avps += app_avps
cer_bytes = Message.new(version: 1, command_code: 257, app_id: 0, request: true, proxyable: false, retransmitted: false, error: false, avps: avps).to_wire
@tcp_helper.send(cer_bytes, cxn)
peer
end
# Sends a Diameter request. This is routed to an appropriate peer
# based on the Destination-Host AVP (or, if that is absent, on the
# Destination-Realm AVP).
#
# This adds this stack's Origin-Host and Origin-Realm AVPs, if
# those AVPs don't already exist.
#
# @param req [Message] The request to send.
# @param peer [Peer] (Optional) A peer to use as the first hop for the message
def send_request(req, options={})
fail "Must pass a request" unless req.request
req.add_origin_host_and_realm(@local_host, @local_realm)
peer = options[:peer]
if peer.nil?
peer = if req['Destination-Host']
peer_identity = req['Destination-Host'].octet_string
Diameter.logger.debug("Selecting peer by Destination-Host (#{peer_identity})")
@peer_table[peer_identity]
elsif req['Destination-Realm']
realm = req['Destination-Realm'].octet_string
Diameter.logger.debug("Selecting peer by Destination-Realm (#{realm})")
@peer_table.values.select { |p| p.realm == realm }.sample
else
fail "Request must have Destination-Host or Destination-Realm"
end
else
Diameter.logger.debug("Peer selection forced to #{peer.identity}")
end
if peer.nil?
Diameter.logger.warn("No peer is available to send message - cannot route")
fail "No acceptable peer"
elsif peer.state == :UP
q = Queue.new
@pending_ete[req.ete] = q
@tcp_helper.send(req.to_wire, peer.cxn)
=begin
# Time this request out if no answer is received
Diameter.logger.debug("Scheduling timeout for #{@answer_timeout}s time")
Concurrent::timer(@answer_timeout) do
Diameter.logger.debug("Timing out message with EtE #{req.ete}")
q = @pending_ete.delete(req.ete)
if q
q.push(:timeout)
end
end
=end
p = Concurrent::Promise.execute(executor: @threadpool) {
Diameter.logger.debug("Waiting for answer to message with EtE #{req.ete}, queue #{q}")
val = q.pop
Diameter.logger.debug("Promise fulfilled for message with EtE #{req.ete}")
val
}
return p
else
Diameter.logger.warn("Peer #{peer.identity} is in state #{peer.state} - cannot route")
end
end
# Sends a Diameter answer. This is sent over the same connection
# the request was received on (which needs to be passed into to
# this method).
#
# This adds this stack's Origin-Host and Origin-Realm AVPs, if
# those AVPs don't already exist.
#
# @param ans [Message] The Diameter answer
# @param original_cxn [Socket] The connection which the request
# came in on. This will have been passed to the block registered
# with {Stack#add_handler}.
def send_answer(ans, original_cxn)
fail "Must pass an answer" unless ans.answer
ans.add_origin_host_and_realm(@local_host, @local_realm)
@tcp_helper.send(ans.to_wire, original_cxn)
end
# Retrieves the current state of a peer, defaulting to :CLOSED if
# the peer does not exist.
#
# @param id [String] The Diameter identity of the peer.
# @return [Keyword] The state of the peer (:UP, :WAITING or :CLOSED).
def peer_state(id)
if !@peer_table.key? id
:CLOSED
else
@peer_table[id].state
end
end
# @!endgroup
# @private
# Handles a Diameter request straight from a network connection.
# Intended to be called by TCPStackHelper after it retrieves a
# message, not directly by users.
def handle_message(msg_bytes, cxn)
# Common processing - ensure that this message has come in on this
# peer's expected connection, and update the last time we saw
# activity on this peer
msg = Message.from_bytes(msg_bytes)
Diameter.logger.debug("Handling message #{msg}")
peer = msg.avp_by_name('Origin-Host').octet_string
if @peer_table[peer]
@peer_table[peer].reset_timer
unless @peer_table[peer].cxn == cxn
Diameter.logger.log(Logger::WARN, "Ignoring message - claims to be from #{peer} but comes from #{cxn} not #{@peer_table[peer].cxn}")
end
end
if msg.command_code == 257 && msg.answer
handle_cea(msg, cxn)
elsif msg.command_code == 257 && msg.request
handle_cer(msg, cxn)
elsif msg.command_code == 280 && msg.request
handle_dwr(msg, cxn)
elsif msg.command_code == 280 && msg.answer
# No-op - we've already updated our timestamp
elsif msg.answer
handle_other_answer(msg, cxn)
elsif @handlers.has_key? msg.app_id
@handlers[msg.app_id].call(msg, cxn)
else
Diameter.logger.warn("Ignoring message from unrecognised application #{msg.app_id} (Command-Code #{msg.command_code})")
end
end
private
def app_avps
avps = []
@auth_apps.each do |app_id, vendor|
avps << if vendor == 0
AVP.create("Auth-Application-Id", app_id)
else
AVP.create("Vendor-Specific-Application-Id",
[AVP.create("Auth-Application-Id", app_id),
AVP.create("Vendor-Id", vendor)])
end
end
@acct_apps.each do |app_id, vendor|
avps << if vendor == 0
AVP.create("Acct-Application-Id", app_id)
else
AVP.create("Vendor-Specific-Application-Id",
[AVP.create("Acct-Application-Id", app_id),
AVP.create("Vendor-Id", vendor)])
end
end
avps
end
def shared_apps(capabilities_msg)
peer_apps = []
app_avps = ["Auth-Application-Id", "Acct-Application-Id"]
app_avps.each do |name|
peer_apps += capabilities_msg.all_avps_by_name(name).collect(&:uint32)
capabilities_msg.all_avps_by_name("Vendor-Specific-Application-Id").each do |avp|
if avp.inner_avp(name)
peer_apps << avp.inner_avp(name).uint32
end
end
end
Diameter.logger.debug("Received app IDs #{peer_apps} from peer, have apps #{@handlers.keys}")
@handlers.keys.to_set & peer_apps.to_set
end
def handle_cer(cer, cxn)
if shared_apps(cer).empty?
rc = 5010
else
rc = 2001
end
cea = cer.create_answer(rc, avps:
[AVP.create('Origin-Host', @local_host),
AVP.create('Origin-Realm', @local_realm)] + app_avps)
@tcp_helper.send(cea.to_wire, cxn)
if rc == 2001
peer = cer.avp_by_name('Origin-Host').octet_string
realm = cer.avp_by_name('Origin-Realm').octet_string
Diameter.logger.debug("Creating peer table entry for peer #{peer} in realm #{realm}")
@peer_table[peer] = Peer.new(peer, realm)
@peer_table[peer].state = :UP
@peer_table[peer].reset_timer
@peer_table[peer].cxn = cxn
else
@tcp_helper.close(cxn)
end
end
def handle_cea(cea, cxn)
host = cea.avp_by_name('Origin-Host').octet_string
if @peer_table.has_key? host
@peer_table[host].state = :UP
@peer_table[host].reset_timer
else
entry = @peer_table.find { |h, p| p.cxn == cxn }
if entry.nil?
Diameter.logger.warn("Ignoring CEA from unknown peer #{host}")
Diameter.logger.debug("Known peers are #{@peer_table.keys}")
else
old_host, peer = entry
Diameter.logger.warn("Peer identity changed #{old_host} => #{host}")
@peer_table.delete(old_host)
peer.identity = host
@peer_table[host] = peer
@peer_table[host].state = :UP
@peer_table[host].reset_timer
end
end
end
def handle_dpr
end
def handle_dpa
end
def handle_dwr(dwr, cxn)
dwa = dwr.create_answer(2001, avps:
[AVP.create('Origin-Host', @local_host),
AVP.create('Origin-Realm', @local_realm)])
@tcp_helper.send(dwa.to_wire, cxn)
# send DWA
end
def handle_dwa
end
def handle_other_request
end
def handle_other_answer(msg, _cxn)
Diameter.logger.debug("Handling answer with End-to-End identifier #{msg.ete}")
q = @pending_ete[msg.ete]
q.push msg
Diameter.logger.debug("Passed answer to fulfil sender's Promise object'")
@pending_ete.delete msg.ete
end
end
end