team-umlaut/umlaut

View on GitHub
app/models/service_wave.rb

Summary

Maintainability
C
1 day
Test Coverage
require 'term_color'

# ServiceWave is basically responsible for multi-threaded execution of
# a list of services, all in the same priority. Generally it's only
# called by Collection, nobody else needs it directly. 
#
class ServiceWave
  attr_accessor :services
  attr_accessor :priority_level
  attr_reader :config

  # Priority level is purely information, used for debug output. 
  def initialize(service_objects, priority_level = nil, config = UmlautController.umlaut_config)
    @services = service_objects
    @priority_level = priority_level
    @config = config
    @log_timing = config.lookup!("log_service_timing", true)

    # Don't forward exceptions, that'll interrupt other service processing.
    # Catch the exception, record it in the dispatch table, done. May want
    # to set this to true for debugging/development, but NOT for production.
    @forward_exceptions = false
  end
  
  # Safe to call in a thread. Returns true or false depending on
  # whether dispatch should proceed. 
  def prepare_dispatch!(request, service)            
    return request.register_in_progress(service)                    
  end

  def handle(request, session_id)
    return if (@services.nil? || @services.empty?)

    bundle_start = Time.now
    Rails.logger.info(TermColor.color("Umlaut: Launching service wave #{@priority_level} #{'(non-threaded)' unless config.lookup!("threaded_service_wave", true) }", :yellow) + ", request #{request.id}") if @log_timing

    
    
    threads = []
    some_service_executed = false
    @services.each do | service |
      some_service_executed = true
      local_request = nil
      
      service_start = Time.now          

      if config.lookup!("threaded_service_wave", true)

      
        threads << Thread.new(request.id, service.clone) do | request_id, local_service |
          # Deal with ruby's brain dead thread scheduling by setting
          # bg threads to a lower priority so they don't interfere with fg
          # threads.
          Thread.current.priority = -1
  
          # Save some things in thread local hash useful for debugging
          Thread.current[:debug_name] = local_service.class.name
          Thread.current[:service] = service
  
          # Tell our AR extension not to allow implicit checkouts
          ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!")
          
          begin
            local_request = Request.connection_pool.with_connection do
              # We are attempting to pre-load all relationships, both for efficiency,
              # and so our thread can use them all without needing to checkout
              # an ActiveRecord connection. 
              req = Request.includes({:referent => :referent_values}, :service_responses, :dispatched_services).find(request_id)

              # It turns out even though referent.referent_values is loaded from the db, on first
              # access Rails will still access #connection, triggering a checkout. We force
              # that to happen here, in our with_connection, so it won't happen later. 
              #
              # Yeah, this is a hacky mess, ActiveRecord isn't happy using it the way we are. 
              # Should we just surround all of handle_wrapper in a with_connection checkout?
              # Maybe. But it would require more connections in the pool to have those
              # longer checkouts. 
              req.referent.referent_values
              req.service_responses
              req.dispatched_services

              req
            end

              
  
            if prepare_dispatch!(local_request, local_service)
              local_service.handle_wrapper(local_request)
            else
              Rails.logger.info("NOT launching service #{local_service.service_id},  level #{@priority_level}, request #{local_request.id}: not in runnable state") if @log_timing
            end
            
           
          rescue StandardError => e
            # We may not be able to access ActiveRecord because it may
            # have been an AR connection error, perhaps out of connections
            # in the pool. So log and record in non-AR ways. 
            # the code waiting on our thread will see exception
            # reported in Thread local var, and log it AR if possible. 
            
            
            # Log it too, although experience shows it may never make it to the 
            # log for mysterious reasons. 
            log_msg = TermColor.color("Umlaut: Threaded service raised exception.", :red, true) + " Service: #{service.service_id}, #{e.class} #{e.message}. Backtrace:\n  #{clean_backtrace(e).join("\n  ")}"
            Rails.logger.error(log_msg)
            
            # And stick it in a thread variable too
            Thread.current[:exception] = e    

            # And try to re-raise if it's one we really don't want to swallow. 
            # Sorry, a mess. 
            raise e if defined?(VCR::Errors::UnhandledHTTPRequestError) && e.kind_of?(VCR::Errors::UnhandledHTTPRequestError)
          ensure
            Rails.logger.info(TermColor.color("Umlaut: Completed service #{local_service.service_id}", :yellow)+ ",  level #{@priority_level}, request #{local_request && local_request.id}: in #{Time.now - service_start}.") if @log_timing
          end
        end
      else # not threaded
        begin
          if prepare_dispatch!(request, service)
              service.handle_wrapper(request)
          else
              Rails.logger.info("NOT launching service #{service.service_id},  level #{@priority_level}, request #{request.id}: not in runnable state") if @log_timing
          end
        ensure
          Rails.logger.info(TermColor.color("Umlaut: Completed service #{service.service_id}", :yellow)+ ",  level #{@priority_level}, request #{request && request.id}: in #{Time.now - service_start}.") if @log_timing
        end
      end
      
    end

    # Wait for all the threads to complete, if any. 
    threads.each do |aThread|
      aThread.join

      if aThread[:exception] 
        debugger if ENV["UMLAUT_AUTO_DEBUGGER"]
        begin
          request.dispatched(aThread[:service], DispatchedService::FailedFatal, aThread[:exception])
        rescue Exception => e
          debugger if ENV["UMLAUT_AUTO_DEBUGGER"]
          raise e
        end
      end

      # Okay, raise if exception, if desired.
      if ( aThread[:exception] && self.forward_exceptions? )        
        raise aThread[:exception]
      end
    end

    threads.clear # more paranoia


    Rails.logger.info(TermColor.color("Umlaut: Completed service wave #{@priority_level}", :yellow) + ", request #{request.id}: in #{Time.now - bundle_start}") if some_service_executed && @log_timing
  end

  def forward_exceptions?
    return @forward_exceptions
  end
  def forward_exceptions=(f)
    @foward_excpetions = f    
  end
  
  protected
    def clean_backtrace(exception)
      Umlaut::Util.clean_backtrace(exception)
    end

  
end