lib/rex/post/meterpreter/packet_dispatcher.rb
# -*- coding: binary -*-
require 'rex/post/meterpreter/command_mapper'
require 'rex/post/meterpreter/packet_response_waiter'
require 'rex/exceptions'
require 'pathname'
module Rex
module Post
module Meterpreter
###
#
# Exception thrown when a request fails.
#
###
class RequestError < ArgumentError
def initialize(command_id, einfo, ecode=nil)
command_name = Rex::Post::Meterpreter::CommandMapper.get_command_name(command_id)
@method = command_name || "##{command_id}"
@result = einfo
@code = ecode || einfo
end
def to_s
"#{@method}: Operation failed: #{@result}"
end
# The method that failed.
attr_reader :method
# The error result that occurred, typically a windows error message.
attr_reader :result
# The error result that occurred, typically a windows error code.
attr_reader :code
end
###
#
# Handles packet transmission, reception, and correlation,
# and processing
#
###
module PacketDispatcher
# Default time, in seconds, to wait for a response after sending a packet
PACKET_TIMEOUT = 600
# Number of seconds to wait without getting any packets before we try to
# send a liveness check. A minute should be generous even on the highest
# latency networks
#
# @see #keepalive
PING_TIME = 60
# This mutex is used to lock out new commands during an
# active migration. Unused if this is a passive dispatcher
attr_accessor :comm_mutex
# The guid that identifies an active Meterpreter session
attr_accessor :session_guid
# This contains the key material used for TLV encryption
attr_accessor :tlv_enc_key
# Passive Dispatching
#
# @return [Rex::ServiceManager]
# @return [nil] if this is not a passive dispatcher
attr_accessor :passive_service
# @return [Array]
attr_accessor :send_queue
# @return [Array]
attr_accessor :recv_queue
def initialize_passive_dispatcher
self.send_queue = []
self.recv_queue = []
self.waiters = []
self.alive = true
end
def shutdown_passive_dispatcher
self.alive = false
self.send_queue = []
self.recv_queue = []
self.waiters = []
end
def on_passive_request(cli, req)
begin
self.last_checkin = ::Time.now
resp = send_queue.shift
cli.send_response(resp)
rescue => e
send_queue.unshift(resp) if resp
elog("Exception sending a reply to the reader request #{cli.inspect}", error: e)
end
end
##
#
# Transmission
#
##
#
# Sends a packet without waiting for a response.
#
def send_packet(packet, opts={})
if self.pivot_session
opts[:session_guid] = self.session_guid
opts[:tlv_enc_key] = self.tlv_enc_key
return self.pivot_session.send_packet(packet, opts)
end
if opts[:completion_routine]
add_response_waiter(packet, opts[:completion_routine], opts[:completion_param])
end
session_guid = self.session_guid
tlv_enc_key = self.tlv_enc_key
# if a session guid is provided, use all the details provided
if opts[:session_guid]
session_guid = opts[:session_guid]
tlv_enc_key = opts[:tlv_enc_key]
end
log_packet(packet, :send)
bytes = 0
raw = packet.to_r(session_guid, tlv_enc_key)
err = nil
# Short-circuit send when using a passive dispatcher
if self.passive_service
send_queue.push(raw)
return raw.size # Lie!
end
if raw
self.comm_mutex.synchronize do
begin
bytes = self.sock.write(raw)
rescue ::Exception => e
err = e
end
end
if bytes.to_i == 0
# Mark the session itself as dead
self.alive = false
# Reraise the error to the top-level caller
raise err if err
end
end
return bytes
end
#
# Sends a packet and waits for a timeout for the given time interval.
#
# @param packet [Packet] request to send
# @param timeout [Integer,nil] seconds to wait for response, or nil to ignore the
# response and return immediately
# @return (see #send_packet_wait_response)
def send_request(packet, timeout = self.response_timeout)
response = send_packet_wait_response(packet, timeout)
if timeout.nil?
return nil
elsif response.nil?
raise Rex::TimeoutError.new("Send timed out")
elsif (response.result != 0)
einfo = lookup_error(response.result)
e = RequestError.new(packet.method, einfo, response.result)
e.set_backtrace(caller)
raise e
end
return response
end
#
# Transmits a packet and waits for a response.
#
# @param packet [Packet] the request packet to send
# @param timeout [Integer,nil] number of seconds to wait, or nil to wait
# forever
def send_packet_wait_response(packet, timeout)
if packet.type == PACKET_TYPE_REQUEST && commands.present?
# XXX: Remove this condition once the payloads gem has had another major version bump from 2.x to 3.x and
# rapid7/metasploit-payloads#451 has been landed to correct the `enumextcmd` behavior on Windows. Until then, skip
# proactive validation of Windows core commands. This is not the only instance of this workaround.
windows_core = base_platform == 'windows' && (packet.method - (packet.method % COMMAND_ID_RANGE)) == Rex::Post::Meterpreter::ClientCore.extension_id
unless windows_core || commands.include?(packet.method)
if (ext_name = Rex::Post::Meterpreter::ExtensionMapper.get_extension_name(packet.method))
unless ext.aliases.include?(ext_name)
raise RequestError.new(packet.method, "The command requires the #{ext_name} extension to be loaded")
end
end
raise RequestError.new(packet.method, "The command is not supported by this Meterpreter type (#{session_type})")
end
end
# First, add the waiter association for the supplied packet
waiter = add_response_waiter(packet)
bytes_written = send_packet(packet)
# Transmit the packet
if (bytes_written.to_i <= 0)
# Remove the waiter if we failed to send the packet.
remove_response_waiter(waiter)
return nil
end
if not timeout
return nil
end
# Wait for the supplied time interval
response = waiter.wait(timeout)
# Remove the waiter from the list of waiters in case it wasn't
# removed. This happens if the waiter timed out above.
remove_response_waiter(waiter)
# wire in the UUID for this, as it should be part of every response
# packet
if response && !self.payload_uuid
uuid = response.get_tlv_value(TLV_TYPE_UUID)
self.payload_uuid = Msf::Payload::UUID.new({:raw => uuid}) if uuid
end
# Return the response packet, if any
return response
end
# Send a ping to the server.
#
# Our 'ping' is a check for eof on channel id 0. This method has no side
# effects and always returns an answer (regardless of the existence of chan
# 0), which is all that's needed for a liveness check. The answer itself is
# unimportant and is ignored.
#
# @return [void]
def keepalive
if @ping_sent
if ::Time.now.to_i - last_checkin.to_i > PING_TIME*2
dlog("No response to ping, session #{self.sid} is dead", LEV_3)
self.alive = false
end
else
pkt = Packet.create_request(COMMAND_ID_CORE_CHANNEL_EOF)
pkt.add_tlv(TLV_TYPE_CHANNEL_ID, 0)
add_response_waiter(pkt, Proc.new { @ping_sent = false })
send_packet(pkt)
@ping_sent = true
end
end
##
#
# Reception
#
##
def pivot_keepalive_start
return unless self.send_keepalives
self.receiver_thread = Rex::ThreadFactory.spawn("PivotKeepalive", false) do
while self.alive
begin
Rex::sleep(PING_TIME)
keepalive
rescue ::Exception => e
dlog("Exception caught in pivot keepalive: #{e.class}: #{e}", 'meterpreter', LEV_1)
dlog("Call stack: #{e.backtrace.join("\n")}", 'meterpreter', LEV_2)
self.alive = false
break
end
end
end
end
#
# Monitors the PacketDispatcher's sock for data in its own
# thread context and parsers all inbound packets.
#
def monitor_socket
# Skip if we are using a passive dispatcher
return if self.passive_service
# redirect to pivot keepalive if we're a pivot session
return pivot_keepalive_start if self.pivot_session
self.comm_mutex = ::Mutex.new
self.waiters = []
# This queue is where the new incoming packets end up
@new_packet_queue = ::Queue.new
# This is where we put packets that aren't new, but haven't
# yet been handled.
@incomplete_queue = ::Queue.new
@ping_sent = false
# Spawn a thread for receiving packets
self.receiver_thread = Rex::ThreadFactory.spawn("MeterpreterReceiver", false) do
while (self.alive)
begin
rv = Rex::ThreadSafe.select([ self.sock.fd ], nil, nil, PING_TIME)
if rv
packet = receive_packet
# Always enqueue the new packets onto the new packet queue
@new_packet_queue << decrypt_inbound_packet(packet) if packet
elsif self.send_keepalives && @new_packet_queue.empty?
keepalive
end
rescue ::Exception => e
dlog("Exception caught in monitor_socket: #{e.class}: #{e}", 'meterpreter', LEV_1)
dlog("Call stack: #{e.backtrace.join("\n")}", 'meterpreter', LEV_2)
self.alive = false
break
end
end
end
# Spawn a new thread that monitors the socket
self.dispatcher_thread = Rex::ThreadFactory.spawn("MeterpreterDispatcher", false) do
begin
while (self.alive)
# This is where we'll store incomplete packets on
# THIS iteration
incomplete = []
# The backlog is the full list of packets that aims
# to be handled this iteration
backlog = []
# If we have any left over packets from the previous
# iteration, we need to prioritise them over the new
# packets. If we don't do this, then we end up in
# situations where data on channels can be processed
# out of order. We don't do a blocking wait here via
# the .pop method because we don't want to block, we
# just want to dump the queue.
while @incomplete_queue.length > 0
backlog << @incomplete_queue.pop
end
# If the backlog is empty, we don't have old/stale
# packets hanging around, so perform a blocking wait
# for the next packet
backlog << @new_packet_queue.pop if backlog.length == 0
# At this point we either received a packet off the wire
# or we had a backlog to process. In either case, we
# perform a non-blocking queue dump to fill the backlog
# with every packet we have.
while @new_packet_queue.length > 0
backlog << @new_packet_queue.pop
end
#
# Prioritize message processing here
# 1. Close should always be processed at the end
# 2. Command responses always before channel data
#
tmp_command = []
tmp_channel = []
tmp_close = []
backlog.each do |pkt|
if(pkt.response?)
tmp_command << pkt
next
end
if(pkt.method == COMMAND_ID_CORE_CHANNEL_CLOSE)
tmp_close << pkt
next
end
tmp_channel << pkt
end
backlog = []
backlog.push(*tmp_command)
backlog.push(*tmp_channel)
backlog.push(*tmp_close)
#
# Process the message queue
#
backlog.each do |pkt|
begin
unless dispatch_inbound_packet(pkt)
# Keep Packets in the receive queue until a handler is registered
# for them. Packets will live in the receive queue for up to
# PACKET_TIMEOUT seconds, after which they will be dropped.
#
# A common reason why there would not immediately be a handler for
# a received Packet is in channels, where a connection may
# open and receive data before anything has asked to read.
if (::Time.now.to_i - pkt.created_at.to_i < PACKET_TIMEOUT)
incomplete << pkt
end
end
rescue ::Exception => e
dlog("Dispatching exception with packet #{pkt}: #{e} #{e.backtrace}", 'meterpreter', LEV_1)
end
end
# If the backlog and incomplete arrays are the same, it means
# dispatch_inbound_packet wasn't able to handle any of the
# packets. When that's the case, we can get into a situation
# where @new_packet_queue is not empty and, since nothing else bounds this
# loop, we spin CPU trying to handle packets that can't be
# handled. Sleep here to treat that situation as though the
# queue is empty.
if (backlog.length > 0 && backlog.length == incomplete.length)
::IO.select(nil, nil, nil, 0.10)
end
# If we have any packets that weren't handled, they go back
# on the incomplete queue so that they're prioritised over
# new packets that are coming in off the wire.
dlog("Requeuing #{incomplete.length} packet(s)", 'meterpreter', LEV_1) if incomplete.length > 0
while incomplete.length > 0
@incomplete_queue << incomplete.shift
end
# If the old queue of packets gets too big...
if(@incomplete_queue.length > 100)
removed = []
# Drop a bunch of them.
(1..25).each {
removed << @incomplete_queue.pop
}
dlog("Backlog has grown to over 100 in monitor_socket, dropping older packets: #{removed.map{|x| x.inspect}.join(" - ")}", 'meterpreter', LEV_1)
end
end
rescue ::Exception => e
dlog("Exception caught in monitor_socket dispatcher: #{e.class} #{e} #{e.backtrace}", 'meterpreter', LEV_1)
ensure
self.receiver_thread.kill if self.receiver_thread
end
end
end
#
# Parses data from the dispatcher's sock and returns a Packet context
# once a full packet has been received.
#
def receive_packet
packet = parser.recv(self.sock)
if packet
packet.parse_header!
if self.session_guid == NULL_GUID
self.session_guid = packet.session_guid.dup
end
end
packet
end
#
# Stop the monitor
#
def monitor_stop
if self.receiver_thread
self.receiver_thread.kill
self.receiver_thread.join
self.receiver_thread = nil
end
if self.dispatcher_thread
self.dispatcher_thread.kill
self.dispatcher_thread.join
self.dispatcher_thread = nil
end
end
##
#
# Waiter registration
#
##
#
# Adds a waiter association with the supplied request packet.
#
def add_response_waiter(request, completion_routine = nil, completion_param = nil)
if self.pivot_session
return self.pivot_session.add_response_waiter(request, completion_routine, completion_param)
end
waiter = PacketResponseWaiter.new(request.rid, completion_routine, completion_param)
self.waiters << waiter
return waiter
end
#
# Notifies a whomever is waiting for a the supplied response,
# if anyone.
#
def notify_response_waiter(response)
if self.pivot_session
return self.pivot_session.notify_response_waiter(response)
end
handled = false
self.waiters.each() { |waiter|
if (waiter.waiting_for?(response))
waiter.notify(response)
remove_response_waiter(waiter)
handled = true
break
end
}
return handled
end
#
# Removes a waiter from the list of waiters.
#
def remove_response_waiter(waiter)
if self.pivot_session
self.pivot_session.remove_response_waiter(waiter)
else
self.waiters.delete(waiter)
end
end
##
#
# Dispatching
#
##
#
# Initializes the inbound handlers.
#
def initialize_inbound_handlers
@inbound_handlers = []
end
#
# Decrypt the given packet with the appropriate key depending on
# if this session is a pivot session or not.
#
def decrypt_inbound_packet(packet)
pivot_session = self.find_pivot_session(packet.session_guid)
tlv_enc_key = self.tlv_enc_key
tlv_enc_key = pivot_session.pivoted_session.tlv_enc_key if pivot_session
packet.from_r(tlv_enc_key)
packet
end
#
# Dispatches and processes an inbound packet. If the packet is a
# response that has an associated waiter, the waiter is notified.
# Otherwise, the packet is passed onto any registered dispatch
# handlers until one returns success.
#
def dispatch_inbound_packet(packet)
handled = false
log_packet(packet, :recv)
# Update our last reply time
self.last_checkin = ::Time.now
pivot_session = self.find_pivot_session(packet.session_guid)
pivot_session.pivoted_session.last_checkin = self.last_checkin if pivot_session
# If the packet is a response, try to notify any potential
# waiters
if packet.response? && notify_response_waiter(packet)
return true
end
# Enumerate all of the inbound packet handlers until one handles
# the packet
@inbound_handlers.each { |handler|
handled = nil
begin
if packet.response?
handled = handler.response_handler(self, packet)
else
handled = handler.request_handler(self, packet)
end
rescue ::Exception => e
dlog("Exception caught in dispatch_inbound_packet: handler=#{handler} #{e.class} #{e} #{e.backtrace}", 'meterpreter', LEV_1)
return true
end
if (handled)
break
end
}
return handled
end
#
# Registers an inbound packet handler that implements the
# InboundPacketHandler interface.
#
def register_inbound_handler(handler)
@inbound_handlers << handler
end
#
# Deregisters a previously registered inbound packet handler.
#
def deregister_inbound_handler(handler)
@inbound_handlers.delete(handler)
end
def initialize_tlv_logging(opt)
self.tlv_logging_error_occured = false
self.tlv_log_file = nil
self.tlv_log_file_path = nil
self.tlv_log_output = :none
if opt.casecmp?('console') || opt.casecmp?('true')
self.tlv_log_output = :console
elsif opt.start_with?('file:')
self.tlv_log_output = :file
self.tlv_log_file_path = opt.split('file:').last
end
end
protected
attr_accessor :receiver_thread # :nodoc:
attr_accessor :dispatcher_thread # :nodoc:
attr_accessor :waiters # :nodoc:
attr_accessor :tlv_log_output # :nodoc:
attr_accessor :tlv_log_file # :nodoc:
attr_accessor :tlv_log_file_path # :nodoc:
attr_accessor :tlv_logging_error_occured # :nodoc:
def shutdown_tlv_logging
self.tlv_log_output = :none
self.tlv_log_file.close unless self.tlv_log_file.nil?
self.tlv_log_file = nil
self.tlv_log_file_path = nil
end
def log_packet(packet, packet_type)
# if we previously failed to log, return
return if self.tlv_logging_error_occured || self.tlv_log_output == :none
if self.tlv_log_output == :console
log_packet_to_console(packet, packet_type)
elsif self.tlv_log_output == :file
log_packet_to_file(packet, packet_type)
end
end
def log_packet_to_console(packet, packet_type)
if packet_type == :send
print "\n%redSEND%clr: #{packet.inspect}\n"
elsif packet_type == :recv
print "\n%bluRECV%clr: #{packet.inspect}\n"
end
end
def log_packet_to_file(packet, packet_type)
pathname = ::Pathname.new(self.tlv_log_file_path.split('file:').last)
begin
if self.tlv_log_file.nil? || self.tlv_log_file.path != pathname.to_s
self.tlv_log_file.close unless self.tlv_log_file.nil?
self.tlv_log_file = ::File.open(pathname, 'a+')
end
if packet_type == :recv
self.tlv_log_file.puts("\nRECV: #{packet.inspect}\n")
elsif packet_type == :send
self.tlv_log_file.puts("\nSEND: #{packet.inspect}\n")
end
rescue ::StandardError => e
self.tlv_logging_error_occured = true
print_error "Failed writing to TLV Log File: #{pathname} with error: #{e.message}. Turning off logging for this session: #{self.inspect}..."
elog(e)
shutdown_tlv_logging
return
end
end
end
module HttpPacketDispatcher
def initialize_passive_dispatcher
super
# Ensure that there is only one leading and trailing slash on the URI
resource_uri = "/" + self.conn_id.to_s.gsub(/(^\/|\/$)/, '') + "/"
self.passive_service = self.passive_dispatcher
self.passive_service.remove_resource(resource_uri)
self.passive_service.add_resource(resource_uri,
'Proc' => Proc.new { |cli, req| on_passive_request(cli, req) },
'VirtualDirectory' => true
)
# Add a reference count to the handler
self.passive_service.ref
end
def shutdown_passive_dispatcher
if self.passive_service
# Ensure that there is only one leading and trailing slash on the URI
resource_uri = "/" + self.conn_id.to_s.gsub(/(^\/|\/$)/, '') + "/"
self.passive_service.remove_resource(resource_uri) if self.passive_service
self.passive_service.deref
self.passive_service = nil
end
super
end
def on_passive_request(cli, req)
begin
resp = Rex::Proto::Http::Response.new(200, "OK")
resp['Content-Type'] = 'application/octet-stream'
resp['Connection'] = 'close'
self.last_checkin = ::Time.now
if req.method == 'GET'
rpkt = send_queue.shift
resp.body = rpkt || ''
begin
cli.send_response(resp)
rescue ::Exception => e
send_queue.unshift(rpkt) if rpkt
elog("Exception sending a reply to the reader request #{cli.inspect}", error: e)
end
else
resp.body = ""
if req.body and req.body.length > 0
packet = Packet.new(0)
packet.add_raw(req.body)
packet.parse_header!
packet = decrypt_inbound_packet(packet)
dispatch_inbound_packet(packet)
end
cli.send_response(resp)
end
rescue ::Exception => e
elog("Exception handling request: #{cli.inspect} #{req.inspect}", error: e)
end
end
end
end; end; end