require 'cron_tab' # for understanding CronTab format for expiring responses.
require 'confstruct'
# A Collection object encapsulates a given UmlautRequest, and a given
# list of Umlaut services that should be run off that request.
# That's exactly what it's initialized with: an umlaut request, and
# list of service definitions. Third parameter pass in an umlaut configuration
# object, to get various timeout values. If you don't pass one in, defaults
# will be used.
# The Collection holds and executes the logic for running those services,
# foreground and background, making sure no service is run twice if it's
# already in progress, timing out expired services, etc.
# This code is a mess, sorry.
class Collection
attr_accessor :umlaut_request
attr_accessor :logger
# configs
attr_accessor :response_expire_interval, :response_expire_crontab_format, :background_service_timeout, :requeue_failedtemporary_services_in
# generally only set to true in testing, can be set for the whole class
# or for particular Collection instances.
class_attribute :forward_background_exceptions
self.forward_background_exceptions = true
# a_umlaut_request is an UmlautRequest, representing a request for services for a context
# object.
# service_hash is a hash of hashes with service definitions, as would
# be in services.yml
# config is a Confstruct::Configuration associated with the current controller,
# has a few config options in it relevant to collection service exec; but
# don't pass in, we'll use a blank one with default values, no prob.
def initialize(a_umlaut_request, service_hash, config =
self.umlaut_request = a_umlaut_request
self.logger = Rails.logger
self.response_expire_interval = config.lookup!("response_expire_interval",
self.response_expire_crontab_format = config.lookup!("response_expire_crontab_format", nil)
self.background_service_timeout = config.lookup!("background_service_timeout", 30.seconds)
self.requeue_failedtemporary_services_in = config.lookup!("requeue_failedtemporary_services_in", 500.seconds)
# @service_definitions will be a two-level hash, pointing to an array.. Task is Standard, LinkOut, etc.
# { [task] => { [priority_level] => [config1, config2, config3],
# [priority_level_2] => [configa], }
# [...]
# }
@service_definitions_flat = service_hash
@service_definitions = {}
# Arrange services by type and priority in @service_definitions
# Starts running all services that are in this collection, for the given
# request set for this collection, if and only if they are not already
# in progress.
# This method can be run on a request multiple times, it'll only re-execute
# services that are executable (not already running, or timed out).
# That characteristic is used when this method is called on a page refresh
# or background update status check.
# Sets all services in collection to have a 'queued' status if appropriate.
# Then actually executes the services that are dispatchable (queued).
# Returns the Thread object used for dispatching background services
def dispatch_services!
# return main thread for background services.
return dispatch_background!
# Will run such services in foreground priority waves. And then reload
# the UmlautRequest object in the current thread, to pick up any
# changes made in service threads.
def dispatch_foreground!
# Foreground services
(0..9).each do | priority |
services_to_run = self.instantiate_services!(:level => priority, :ids => runnable_services_for_priority(priority))
next if services_to_run.empty? , priority).handle(umlaut_request, umlaut_request.session_id)
# Need to reload the request from db, so it gets changes
# made by services in threads, so future code (such as view rendering)
# will see changes.
# Will run such services in background priority waves. If some
# services are already running, will not run services in subsequent
# waves until they are done -- guard against multiple HTTP
# requests while services in progress.
# Returns the Thread object used for dispatching background services.
def dispatch_background!
# Now we do some crazy magic, start a Thread to run our background
# services. We are NOT going to wait for this thread to join,
# we're going to let it keep doing it's thing in the background after
# we return a response to the browser
backgroundThread =, umlaut_request) do | t_collection, t_request|
# Tell our AR extension not to allow implicit checkouts
ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!")
# Set priority to lower for background thread; may or may not
# actually have an effect in MRI, unclear, but can't hurt.
prior = Thread.current.priority
Thread.current.priority = prior - 1
# Try to give the thread scheduler another hint, really, get
# other stuff done before this thread.
force_refresh = false
('a'..'z').each do | priority |
# force refresh only if we just ran some services, otherwise not enough
# time has gone by to be worthwhile.
runnable_ids = runnable_services_for_priority(priority, :refresh => force_refresh)
services_to_run = self.instantiate_services!(:level => priority, :ids => runnable_ids)
if services_to_run.empty?
force_refresh = false
end , priority).handle(umlaut_request, umlaut_request.session_id)
force_refresh = true
rescue Exception => e
# We are divorced from any HTTP request at this point, and may not
# have access to an ActiveRecord connection. Not much
# we can do except log it.
# If we're catching an exception here, service processing was
# probably interrupted, which is bad. You should not intentionally
# raise exceptions to be caught here.
# Normally even unexpected exceptions were caught inside the ServiceWave,
# and logged to db as well as logfile if possible, only bugs in ServiceWave
# itself should wind up caught here.
Thread.current[:exception] = e
logger.error("Background Service execution exception: #{e.inspect}\n " + Umlaut::Util.clean_backtrace(e).join("\n "))
# One exception is in test environment, when we may be intentionally
# trying to get exceptions to propagate up from ServiceWave to here,
# and then onward, in order to be caught by tests.
if self.forward_background_exceptions
raise e
# Goes through existing DispatchedService objects, and freshens them up:
# * If a service is marked in progress longer than timeout, mark
# it failed temporary.
# * If an existing failed temporary is older than our resurrection time,
# delete the dispatch (and all it's responses), so it can be re-queued.
def freshen_dispatches!
umlaut_request.dispatched_services.each do | ds |
# go through dispatched_services and set still in progress but too long to failed temporary
if ( (ds.status == DispatchedService::InProgress ||
ds.status == DispatchedService::Queued ) &&
( - ds.updated_at) > self.background_service_timeout)
ds.store_exception("background service timed out (took longer than #{self.background_service_timeout} to run); thread assumed dead.")) unless ds.exception_info
# Fail it temporary, it'll be run again.
ds.status = DispatchedService::FailedTemporary!
logger.warn("Background service timed out, thread assumed dead. #{} / #{ds.service_id}")
# go through dispatched_services and delete:
# 1) old completed dispatches, too old to use.
# 2) failedtemporary dispatches that are older than our resurrection time
# -> And all responses associated with those dispatches.
# After being deleted, they'll end up re-queued.
if ( (ds.completed? && completed_dispatch_expired?(ds) ) ||
( ds.status == DispatchedService::FailedTemporary &&
( - ds.updated_at) > self.requeue_failedtemporary_services_in
# Need to expire. Delete all the service responses, and
# the DispatchedService record, and service will be automatically
# run again.
serv_id = ds.service_id
umlaut_request.service_responses.each do |response|
if response.service_id == serv_id
# For all configured services, if they have NO DispatchedService
# object, then create one with status Queued
def mark_queued_if_empty!
our_service_ids = self.get_service_definitions.collect {|d| d["service_id"]}
existing_dispatches = umlaut_request.dispatched_services.collect {|d| d.service_id}
not_yet_existing = our_service_ids - existing_dispatches
not_yet_existing.each do |service_id|
umlaut_request.new_dispatch_object!(service_id, DispatchedService::Queued).save!
# All services for priority that are marked Queued, so long as
# no previous waves are still marked running.
# Pass `:refresh => true` as second argument to force trip
# to the database to get fresh DispatchedService objects.
# Returns array of service_id's, or empty array.
def runnable_services_for_priority(priority, options = {})
DispatchedService.connection_pool.with_connection do
service_definitions = self.get_service_definitions
# Make a hash where key is service id, and value is priority.to_s
service_to_priority = Hash[
service_definitions.collect do |d|
[ d["service_id"], d["priority"].to_s ]
if options[:refresh]
# force a refresh
# If there is any service earlier than this wave still marked InProgress,
# we're not ready to run this wave, return empty array.
# Important to avoid race condition on HTTP requests, don't
# dispatch later background waves unless earlier are actually complete,
# even on an HTTP status check.
previous_waves_running = umlaut_request.dispatched_services.find do |ds|
ds.status == DispatchedService::InProgress &&
service_to_priority[ ds.service_id ] < priority.to_s
return [] if previous_waves_running
# otherwise, the services for this priority are runnable if
# they are already marked Queued
# We use .to_a, we want to use the already in memory array, not
# go to the db here.
return umlaut_request.dispatched_services.to_a.find_all do |ds|
ds.status == DispatchedService::Queued &&
service_to_priority[ ds.service_id ] == priority.to_s
end.collect {|ds| ds.service_id}
def completed_dispatch_expired?(ds)
interval = self.response_expire_interval
crontab = self.response_expire_crontab_format
now =
return nil unless interval || crontab
expired_interval = interval && (now - ds.created_at > interval)
expired_crontab = crontab && (now >
return expired_interval || expired_crontab
# Instantiate new copies of services included in this collection, which
# services specified by options, can combine:
# :task => Service::StandardTask (default) or Service::LinkOutFilterTask
# :level => priority level, default to returning services from all.
# :ids => list of id's, only those.
def instantiate_services!(options ={})
get_service_definitions(options).collect do |svc_def|
ServiceStore.instantiate_service!(svc_def, umlaut_request)
# Deprecated, use #instantiate_services! with :task => Service::LinkOutFilter.
def link_out_service_level(level)
instantiate_services!(:task => Service::LinkOutFilterTask,
:level => level)
# Get service definition hashes for services in this institution.
# options, returned in an array.
# Does return a mutatable array that Collection mutates
# internally, but clients really ought not to mutate.
# :task => Service::StandardTask (default) or Service::LinkOutFilterTask
# :level => priority level, default to returning services from all.
# :ids => list of service unique ids, return only these.
def get_service_definitions(options = {})
options[:task] ||= Service::StandardTask
configs_for_task = @service_definitions[ options[:task] ] || {}
service_configs = case options[:level]
when nil
# All of of them for this task
configs_for_task[ options[:level] ] || []
if options[:ids]
service_configs = service_configs.find_all {|s| options[:ids].include? s["service_id"] }
return service_configs
# Arrange services in hash according to task type and priority.
def gather_services
@service_definitions_flat.each_pair do | unique_id, svc_def |
next if svc_def.nil?
svc_def['service_id'] = unique_id
task = svc_def['task'] || Service::StandardTask
level = svc_def['priority'] || 3
@service_definitions[task] ||= {}
@service_definitions[task][level] ||= []
@service_definitions[task][level] << svc_def