lib/ronin/recon/worker_pool.rb
# frozen_string_literal: true
#
# ronin-recon - A micro-framework and tool for performing reconnaissance.
#
# Copyright (c) 2023-2024 Hal Brodigan (postmodern.mod3@gmail.com)
#
# ronin-recon is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ronin-recon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with ronin-recon. If not, see <https://www.gnu.org/licenses/>.
#
require_relative 'message/value'
require_relative 'message/worker_started'
require_relative 'message/worker_stopped'
require_relative 'message/job_started'
require_relative 'message/job_completed'
require_relative 'message/job_failed'
require_relative 'message/shutdown'
require 'ronin/core/params/mixin'
require 'async/queue'
module Ronin
module Recon
#
# Contains the `Async::Task` objects for a worker, that process messages
# from the input queue and sends messages to the output queue.
#
# @api private
#
class WorkerPool
# The recon worker's ID.
#
# @return [String]
attr_reader :id
# The number of async worker tasks to spawn.
#
# @return [Integer]
attr_reader :concurrency
# The worker object.
#
# @return [Worker]
attr_reader :worker
# The input queue for the worker(s).
#
# @return [Async::Queue]
attr_reader :input_queue
# The output queue for the worker(s).
#
# @return [Async::Queue]
attr_reader :output_queue
# The logger for debug messages.
#
# @return [Console::Logger]
attr_reader :logger
#
# Initializes the worker pool.
#
# @param [Worker] worker
# The initialized worker object.
#
# @param [Integer, nil] concurrency
# The number of async tasks to spawn.
#
# @param [Async::Queue] output_queue
# The output queue to send discovered values to.
#
# @param [Console::Logger] logger
# The console logger object.
#
def initialize(worker, concurrency: nil,
output_queue: ,
params: nil,
logger: Console.logger)
@worker = worker
@concurrency = concurrency || worker.class.concurrency
@input_queue = Async::Queue.new
@output_queue = output_queue
@logger = logger
@tasks = nil
end
#
# Routes a message to the worker.
#
# @param [Message::Value, Message::STOP] mesg
# The message to route.
#
def enqueue_mesg(mesg)
case mesg
when Message::SHUTDOWN
# push the Stop message for each worker task
@concurrency.times do
@input_queue.enqueue(mesg)
end
else
@input_queue.enqueue(mesg)
end
end
#
# Runs the worker.
#
def run
# HACK: for some reason `until (mesg = ...) == Message::SHUTDOWn)`
# causes `Message::SHUTDOWN` objects to slip by. Changing it to a
# `loop do` fixes this for some reason.
loop do
if (mesg = @input_queue.dequeue) == Message::SHUTDOWN
break
end
value = mesg.value
enqueue(Message::JobStarted.new(@worker,value))
begin
@worker.process(value) do |result|
@logger.debug("Output value yielded: #{@worker} #{value.inspect} -> #{result.inspect}")
new_value = Message::Value.new(result, worker: @worker,
parent: value,
depth: mesg.depth + 1)
enqueue(new_value)
end
enqueue(Message::JobCompleted.new(@worker,value))
rescue StandardError => error
enqueue(Message::JobFailed.new(@worker,value,error))
end
end
stopped!
end
#
# Starts the worker pool.
#
# @param [Async::Task] task
# The optional async task to register the worker under.
#
def start(task=Async::Task.current)
# mark the worker as running
started!
@tasks = []
@concurrency.times do
@tasks << task.async { run }
end
end
#
# Marks the worker pool as running.
#
def started!
# send a message to the engine that the worker pool has started
enqueue(Message::WorkerStarted.new(@worker))
end
#
# Marks the worker pool as stopped.
#
def stopped!
# send a message to the engine that the worker pool has stopped
enqueue(Message::WorkerStopped.new(@worker))
end
private
#
# Sends a message to the output queue.
#
# @param [Message::JobStarted, Message::JobCompleted, Message::JobFailed, Message::Value] mesg
# The message object to enqueue.
#
def enqueue(mesg)
@output_queue.enqueue(mesg)
end
end
end
end