ruby/lib/metasploit/aggregator.rb
require 'socket'
require 'openssl'
require 'thread'
require 'securerandom'
require 'metasploit/aggregator/error'
require 'metasploit/aggregator/messages_pb'
require 'metasploit/aggregator/aggregator_services_pb'
require 'metasploit/aggregator/version'
require 'metasploit/aggregator/cable'
require 'metasploit/aggregator/connection_manager'
require 'metasploit/aggregator/https_forwarder'
require 'metasploit/aggregator/http'
require 'metasploit/aggregator/logger'
module Metasploit
module Aggregator
class Service
# return availability status of the service
def available?
# index for impl
end
# return the current service version found
def version
Metasploit::Aggregator::VERSION
end
# returns map of sessions available from the service
def sessions
# index for impl
end
def cables
# index for impl
end
# sets forwarding for a specific session to promote
# that session for local use, obtained sessions are
# not reported in getSessions
def obtain_session(payload, uuid)
# index for impl
end
# parks a session and makes it available in the getSessions
def release_session(payload)
# index for impl
end
# return any extended details for the payload requested
def session_details(payload)
end
# start a listening port maintained on the service
# connections are forwarded to any registered default
# TODO: may want to require a type here for future proof of api
def add_cable(type, host, port, certificate = nil)
# index for impl
end
def remove_cable(host, port)
# index for impl
end
def register_default(uuid, payload_list)
# index for impl
end
def default
# index for impl
end
# returns list of IP addressed available to the service
# TODO: consider also reporting "used" ports (may not be needed)
def available_addresses
# index for impl
end
# register the object to pass request from cables to
def register_response_channel(requester)
end
end
class ServerProxy < Service
attr_reader :uuid
@exit_lock = Mutex.new
@host = @port = @socket = nil
@no_params = nil
@response_queue = nil
@listening_thread = nil
@cleanup_list = nil
def initialize(host, port)
@host = host
@port = port
@client = Metasploit::Aggregator::Pb::Stub.new("#{@host}:#{@port}", :this_channel_is_insecure)
# TODO: add arg{ :channel_override => Core::Channel } to control connection
@uuid = SecureRandom.uuid
@no_params = Metasploit::Aggregator::Message::No_params.new
# server_version = pb_to_array(@client.version(@no_params).value)[0]
# raise CompatibilityError("server version mis-match found #{server_version}") unless server_version == version
end
def available?
begin
@client.available(@no_params).answer
rescue GRPC::Unavailable
false # unavailable if client throws exception.
end
end
def sessions
pb_to_map(@client.sessions(@no_params).map)
end
def cables
pb_to_array(@client.cables(@no_params).value)
end
def obtain_session(payload, uuid)
args = Metasploit::Aggregator::Message::String_array.new( value: [payload, uuid] )
@client.obtain_session(args).answer
end
def release_session(payload)
args = Metasploit::Aggregator::Message::String_array.new( value: [payload] )
@client.release_session(args).answer
end
def session_details(payload)
args = Metasploit::Aggregator::Message::String_array.new( value: [payload] )
pb_to_map(@client.session_details(args).map)
end
def add_cable(type, host, port, certificate = nil)
args = nil
if certificate.nil?
args = Metasploit::Aggregator::Message::Cable_def.new( type: type, host: host, port: port.to_i )
else
args = Metasploit::Aggregator::Message::Cable_def.new( type: type, host: host, port: port.to_i, pem: certificate )
end
@client.add_cable(args).answer
end
def remove_cable(host, port)
args = Metasploit::Aggregator::Message::String_array.new( value: [host, port] )
@client.remove_cable(args).answer
end
def register_default(uuid, payload_list)
uuid = "" if uuid.nil?
payloads = []
payloads = payload + payload_list unless payload_list.nil?
args = Metasploit::Aggregator::Message::Register.new( uuid: uuid, payloads: payloads )
@client.register_default(args).answer
end
def default
pb_to_array(@client.default(@no_params).value)[0]
end
def available_addresses
pb_to_array(@client.available_addresses(@no_params).value)
end
def stop(force = false)
# end the response queue
ServerProxy.unregister_for_cleanup(self) unless force
@response_queue.push(self) unless @response_queue.nil?
@listening_thread.join if @listening_thread
@listening_thread = nil
@client = nil
end
def register_response_channel(requester)
unless requester.kind_of? Metasploit::Aggregator::Http::Requester
raise ArgumentError("response channel class invalid")
end
@response_io = requester
process
end
protected
def self.register_for_cleanup(connection)
@exit_lock.synchronize do
unless @cleanup_list
@cleanup_list = ::Set.new
at_exit { ServerProxy.run_cleanup }
end
@cleanup_list.add connection
end
end
def self.unregister_for_cleanup(connection)
@exit_lock.synchronize do
@cleanup_list.delete connection if @cleanup_list
end
end
def self.run_cleanup
@exit_lock.synchronize do
if @cleanup_list
@cleanup_list.each do |connection|
connection.stop(true)
end
end
end
end
private
def pb_to_map(map)
result = {}
map.each do |key , value|
result[key] = value
end
result
end
def pb_to_array(array)
result = []
array.each do |value|
result << value
end
result
end
def process
@response_queue = EnumeratorQueue.new(self)
requests = @client.process(@response_queue.each_item)
# add initial key response with only local uuid
initial_response = Metasploit::Aggregator::Message::Response.new( uuid: @uuid )
@response_queue.push(initial_response)
@listening_thread = Thread.new do
begin
requests.each do |pb_request|
request = Metasploit::Aggregator::Http::Request.new(pb_to_array(pb_request.headers), pb_request.body, nil)
response = @response_io.process_request(request)
session_id = Metasploit::Aggregator::Http::Request.parse_uri(request)
pb_request = Metasploit::Aggregator::Message::Request.new( headers: response.headers, body: response.body )
pb_response = Metasploit::Aggregator::Message::Response.new( uuid: session_id, response: pb_request)
@response_queue.push(pb_response)
end
rescue GRPC::Unavailable
false # The remote connection has ended stop this processing thread.
end
end
ServerProxy.register_for_cleanup self
end
end
# A EnumeratorQueue wraps a Queue to yield the items added to it.
class EnumeratorQueue
extend Forwardable
def_delegators :@q, :push
def initialize(sentinel)
@q = Queue.new
@sentinel = sentinel
end
def each_item
return enum_for(:each_item) unless block_given?
loop do
r = @q.pop
break if r.equal?(@sentinel)
fail r if r.is_a? Exception
yield r
end
end
end
class ServerImpl < Metasploit::Aggregator::Pb::Service
def initialize
super
@local_server = Server.new
@requestThreads = {}
@listeners = []
end
def available(_no_params, _unused_call)
Metasploit::Aggregator::Message::Result.new( answer: @local_server.available? )
end
def version(_no_params, _unused_call)
Metasploit::Aggregator::Message::String_array.new( value: [ @local_server.version ] )
end
def sessions(_no_parms, _unused_call)
Metasploit::Aggregator::Message::Result_map.new( map: @local_server.sessions() )
end
def cables(_no_parms, _unused_call)
Metasploit::Aggregator::Message::String_array.new( value: @local_server.cables() )
end
def obtain_session(args, _unused_call)
payload, uuid = args.value
Metasploit::Aggregator::Message::Result.new( answer: @local_server.obtain_session(payload, uuid) )
end
def release_session(args, _unused_call)
payload = args.value.shift
Metasploit::Aggregator::Message::Result.new( answer: @local_server.release_session(payload) )
end
def session_details(args, _unused_call)
payload = args.value.shift
Metasploit::Aggregator::Message::Result_map.new( map: @local_server.session_details(payload) )
end
def add_cable(cable, _unused_call)
pem = nil
pem = cable.pem unless cable.pem.empty?
result = @local_server.add_cable(cable.type, cable.host, cable.port, pem)
Metasploit::Aggregator::Message::Result.new( answer: result )
end
def remove_cable(args, _unused_call)
host, port = args.value
result = @local_server.remove_cable(host, port)
Metasploit::Aggregator::Message::Result.new( answer: result )
end
def register_default(register, _unused_call)
payloads = nil
payloads = register.payloads unless register.payloads.empty?
result = @local_server.register_default(register.uuid, payloads)
Metasploit::Aggregator::Message::Result.new( answer: result )
end
def default(_no_params, _unused_call)
uuid = @local_server.default
return Metasploit::Aggregator::Message::String_array.new( value: [ uuid ] ) unless uuid.nil?
Metasploit::Aggregator::Message::String_array.new()
end
def available_addresses(_no_params, _unused_call)
addresses = @local_server.available_addresses
Metasploit::Aggregator::Message::String_array.new( value: addresses )
end
def process(responses)
requests = EnumeratorQueue.new(self)
uuid = nil
requestingThread = Thread.new do
loop do
sleep 0.1 # outer loop only occurs until uuid is set
break unless uuid.nil?
end
while true
request = @local_server.request(uuid)
# TODO: with this in place we can just get the request queue and pop each item to process and forward
sleep 0.1
unless request.nil?
body = ""
body = request.body unless request.body.nil?
pb_request = Metasploit::Aggregator::Message::Request.new( headers: request.headers, body: body )
requests.push(pb_request)
end
end
end
Thread.new do
responses.each do |response|
uuid = response.uuid if uuid.nil?
next if response.response.nil?
request_pb = response.response
request = Metasploit::Aggregator::Http::Request.new(request_pb.headers, request_pb.body, nil)
@local_server.respond(response.uuid, request)
end
requestingThread.exit
requestingThread.join
requests.push(self)
end
requests.each_item
end
end
class GrpcServer
@exit_lock = Mutex.new
def initialize(host, port)
@host = host
@port = port
# TODO: investigate using Core::Channel to secure this communication
# server = TCPServer.new(@host, @port)
# sslContext = OpenSSL::SSL::SSLContext.new
# sslContext.key, sslContext.cert = Metasploit::Aggregator::ConnectionManager.ssl_generate_certificate
# sslServer = OpenSSL::SSL::SSLServer.new(server, sslContext)
@svr = GRPC::RpcServer.new
@svr.add_http2_port("#{@host}:#{@port}", :this_port_is_insecure)
@svr.handle(ServerImpl)
@exec = Thread.new do
GrpcServer.register_for_cleanup(self)
@svr.run_till_terminated
end
end
def stop(force = false)
GrpcServer.unregister_for_cleanup(self) unless force
@svr.stop if @svr.running?
end
protected
def self.register_for_cleanup(connection)
@exit_lock.synchronize do
unless @cleanup_list
@cleanup_list = ::Set.new
at_exit { GrpcServer.run_cleanup }
end
@cleanup_list.add connection
end
end
def self.unregister_for_cleanup(connection)
@exit_lock.synchronize do
@cleanup_list.delete connection if @cleanup_list
end
end
def self.run_cleanup
@exit_lock.synchronize do
if @cleanup_list
@cleanup_list.each do |connection|
connection.stop(true)
end
end
end
end
end
class Server < Service
def initialize
@router = Router.instance
@manager = ConnectionManager.instance
end
def available?
!@manager.nil?
end
def sessions
@manager.connections
end
def cables
@manager.cables
end
def obtain_session(payload, uuid)
# return session object details or UUID/uri
# forwarding will cause new session creation on the console
# TODO: check and set lock on payload requested see note below in register_default
@manager.register_forward(uuid, [ payload ])
true # update later to return if lock obtained
end
def release_session(payload)
@manager.park(payload)
true # return always return success for now
end
def session_details(payload)
@manager.connection_details(payload)
end
def add_cable(type, host, port, certificate = nil)
unless @manager.nil?
case type
when Cable::HTTPS
# TODO: check if already listening on that port
@manager.add_cable_https(host, port, certificate)
when Cable::HTTP
@manager.add_cable_http(host, port)
else
Logger.log("#{type} cables are not supported.")
end
end
true
end
def remove_cable(host, port)
unless @manager.nil?
@manager.remove_cable(host, port)
end
end
def register_default(uuid, payload_list)
# add this payload list to each forwarder for this remote console
# TODO: consider adding boolean param to ConnectionManager.register_forward to 'lock'
@manager.register_forward(uuid, payload_list)
true
end
def default
_send, _recv, console = @router.get_forward('default')
console
end
def available_addresses
addr_list = Socket.ip_address_list
addresses = []
addr_list.each do |addr|
addresses << addr.ip_address
end
addresses
end
def stop
unless @manager.nil?
@manager.stop
end
@manager = nil
true
end
def request(uuid)
# return requests here
send, _recv = @router.reverse_route(uuid)
if send.length > 0
result = send.pop
end
result
end
def respond(uuid, data)
_send, recv = @router.get_forward(uuid)
recv << data unless recv.nil?
true
end
def register_response_channel(io)
# not implemented "client only method"
response = "register_response_channel not implemented on server"
Logger.log response
response
end
end # class Server
end
end