app/models/service_wave.rb
require 'term_color'
# ServiceWave is basically responsible for multi-threaded execution of
# a list of services, all in the same priority. Generally it's only
# called by Collection, nobody else needs it directly.
#
class ServiceWave
attr_accessor :services
attr_accessor :priority_level
attr_reader :config
# Priority level is purely information, used for debug output.
def initialize(service_objects, priority_level = nil, config = UmlautController.umlaut_config)
@services = service_objects
@priority_level = priority_level
@config = config
@log_timing = config.lookup!("log_service_timing", true)
# Don't forward exceptions, that'll interrupt other service processing.
# Catch the exception, record it in the dispatch table, done. May want
# to set this to true for debugging/development, but NOT for production.
@forward_exceptions = false
end
# Safe to call in a thread. Returns true or false depending on
# whether dispatch should proceed.
def prepare_dispatch!(request, service)
return request.register_in_progress(service)
end
def handle(request, session_id)
return if (@services.nil? || @services.empty?)
bundle_start = Time.now
Rails.logger.info(TermColor.color("Umlaut: Launching service wave #{@priority_level} #{'(non-threaded)' unless config.lookup!("threaded_service_wave", true) }", :yellow) + ", request #{request.id}") if @log_timing
threads = []
some_service_executed = false
@services.each do | service |
some_service_executed = true
local_request = nil
service_start = Time.now
if config.lookup!("threaded_service_wave", true)
threads << Thread.new(request.id, service.clone) do | request_id, local_service |
# Deal with ruby's brain dead thread scheduling by setting
# bg threads to a lower priority so they don't interfere with fg
# threads.
Thread.current.priority = -1
# Save some things in thread local hash useful for debugging
Thread.current[:debug_name] = local_service.class.name
Thread.current[:service] = service
# Tell our AR extension not to allow implicit checkouts
ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!")
begin
local_request = Request.connection_pool.with_connection do
# We are attempting to pre-load all relationships, both for efficiency,
# and so our thread can use them all without needing to checkout
# an ActiveRecord connection.
req = Request.includes({:referent => :referent_values}, :service_responses, :dispatched_services).find(request_id)
# It turns out even though referent.referent_values is loaded from the db, on first
# access Rails will still access #connection, triggering a checkout. We force
# that to happen here, in our with_connection, so it won't happen later.
#
# Yeah, this is a hacky mess, ActiveRecord isn't happy using it the way we are.
# Should we just surround all of handle_wrapper in a with_connection checkout?
# Maybe. But it would require more connections in the pool to have those
# longer checkouts.
req.referent.referent_values
req.service_responses
req.dispatched_services
req
end
if prepare_dispatch!(local_request, local_service)
local_service.handle_wrapper(local_request)
else
Rails.logger.info("NOT launching service #{local_service.service_id}, level #{@priority_level}, request #{local_request.id}: not in runnable state") if @log_timing
end
rescue StandardError => e
# We may not be able to access ActiveRecord because it may
# have been an AR connection error, perhaps out of connections
# in the pool. So log and record in non-AR ways.
# the code waiting on our thread will see exception
# reported in Thread local var, and log it AR if possible.
# Log it too, although experience shows it may never make it to the
# log for mysterious reasons.
log_msg = TermColor.color("Umlaut: Threaded service raised exception.", :red, true) + " Service: #{service.service_id}, #{e.class} #{e.message}. Backtrace:\n #{clean_backtrace(e).join("\n ")}"
Rails.logger.error(log_msg)
# And stick it in a thread variable too
Thread.current[:exception] = e
# And try to re-raise if it's one we really don't want to swallow.
# Sorry, a mess.
raise e if defined?(VCR::Errors::UnhandledHTTPRequestError) && e.kind_of?(VCR::Errors::UnhandledHTTPRequestError)
ensure
Rails.logger.info(TermColor.color("Umlaut: Completed service #{local_service.service_id}", :yellow)+ ", level #{@priority_level}, request #{local_request && local_request.id}: in #{Time.now - service_start}.") if @log_timing
end
end
else # not threaded
begin
if prepare_dispatch!(request, service)
service.handle_wrapper(request)
else
Rails.logger.info("NOT launching service #{service.service_id}, level #{@priority_level}, request #{request.id}: not in runnable state") if @log_timing
end
ensure
Rails.logger.info(TermColor.color("Umlaut: Completed service #{service.service_id}", :yellow)+ ", level #{@priority_level}, request #{request && request.id}: in #{Time.now - service_start}.") if @log_timing
end
end
end
# Wait for all the threads to complete, if any.
threads.each do |aThread|
aThread.join
if aThread[:exception]
debugger if ENV["UMLAUT_AUTO_DEBUGGER"]
begin
request.dispatched(aThread[:service], DispatchedService::FailedFatal, aThread[:exception])
rescue Exception => e
debugger if ENV["UMLAUT_AUTO_DEBUGGER"]
raise e
end
end
# Okay, raise if exception, if desired.
if ( aThread[:exception] && self.forward_exceptions? )
raise aThread[:exception]
end
end
threads.clear # more paranoia
Rails.logger.info(TermColor.color("Umlaut: Completed service wave #{@priority_level}", :yellow) + ", request #{request.id}: in #{Time.now - bundle_start}") if some_service_executed && @log_timing
end
def forward_exceptions?
return @forward_exceptions
end
def forward_exceptions=(f)
@foward_excpetions = f
end
protected
def clean_backtrace(exception)
Umlaut::Util.clean_backtrace(exception)
end
end