app/modules/media_poll.rb
# frozen_string_literal: true
class MediaPoll
HEADER_KEY_RESPONSE_FROM = 'X-Media-Response-From'
HEADER_KEY_RESPONSE_START = 'X-Media-Response-Start'
HEADER_VALUE_RESPONSE_CACHE = 'Cache'
HEADER_VALUE_RESPONSE_REMOTE_CACHE = 'Remote+Fast Cache'
HEADER_VALUE_RESPONSE_REMOTE = 'Generated Remotely'
HEADER_VALUE_RESPONSE_LOCAL = 'Generated Locally'
HEADER_KEY_ELAPSED_TOTAL = 'X-Media-Elapsed-Seconds-Total'
HEADER_KEY_ELAPSED_PROCESSING = 'X-Media-Elapsed-Seconds-Processing'
HEADER_KEY_ELAPSED_WAITING = 'X-Media-Elapsed-Seconds-Waiting'
HEADERS_EXPOSED = [
'Content-Length',
HEADER_KEY_RESPONSE_FROM,
HEADER_KEY_RESPONSE_START,
HEADER_KEY_ELAPSED_TOTAL,
HEADER_KEY_ELAPSED_PROCESSING,
HEADER_KEY_ELAPSED_WAITING
].freeze
class << self
# this will block the request and wait until at least one of the files is available
# waits up to wait_max seconds
# @param [Array<String>] expected_files
# @param [Number] wait_max
# @param [Number] poll_delay
# @return [Array<String>] existing files
def poll_media(expected_files, wait_max, poll_delay = 0.5)
#timeout_sec_dir_list = 2.0
#run_ext_program = BawAudioTools::RunExternalProgram.new(timeout_sec_dir_list, Rails.logger)
poll_locations = prepare_locations(expected_files)
Rails.logger.debug "MediaPoll#poll_media: Before polling for files #{expected_files}"
existing_files = []
poll_result = poll(wait_max, poll_delay) {
existing_files = get_existing_files(poll_locations)
Rails.logger.debug "MediaPoll#poll_media: Poll matched files #{existing_files}"
# return true if polling is complete, false to continue polling.
!existing_files.empty?
}
# raise error if polling did not return a result
if poll_result[:result].nil?
msg = "Media file was not found within #{wait_max} seconds."
job_info = poll_result.merge({
poll_locations: poll_locations,
existing_files: existing_files
})
raise CustomErrors::AudioGenerationError.new(msg, job_info)
end
Rails.logger.debug "MediaPoll#poll_media: POST-poll matched file #{existing_files} after #{poll_result[:actual_poll_time]}"
existing_files
end
# this will block the request and wait until the resque job is complete
# waits up to wait_max seconds
# @param [Symbol] media_type
# @param [Hash] media_request_params
# @param [Number] wait_max
# @param [Number] poll_delay
# @return [Resque::Plugins::Status::Hash] job status
def poll_resque(_media_type, _media_request_params, wait_max, poll_delay = 0.5, job_id:)
# store most recent status when polling ends
status = nil
poll_result = poll(wait_max, poll_delay) {
status = BawWorkers::ActiveJob::Status::Persistance.get(job_id)
#current_status = status.status # e.g. Resque::Plugins::Status::STATUS_QUEUED
# the accuracy of the polling time is the poll_delay
# for more accurate times, use difference between time and message (when message is 'Completed at <iso8601 time>')
#time_started = status.time
#time_finished = status.message
# true if polling complete, false to continue polling
# status.queued? || status.working? # job in progress - continue polling or time out
# job completed successfully?
!status.blank? && status.completed?
}
Rails.logger.debug "MediaPoll#poll_resque: Result from resque poll was #{status}."
# raise error if polling did not return a result
if poll_result[:result].nil?
resque_task_status = status.nil? ? '(none)' : status.status
msg = "Resque did not complete media request within #{wait_max} seconds, result was '#{resque_task_status}'."
job_info = poll_result.merge({
uuid: status.nil? ? nil : status.uuid,
time: status.nil? ? nil : status.time,
status: resque_task_status
})
raise CustomErrors::AudioGenerationError.new(msg, job_info)
end
status
end
def poll_resque_and_media(expected_files, _media_type, _media_request_params, wait_max, job_id:, poll_delay: 0.4)
poll_locations = prepare_locations(expected_files)
existing_files = []
resque_status = nil
should_check_redis_cache = Settings.actions.media.cache_to_redis
in_memory_file = nil
redis_cache_key = nil
if should_check_redis_cache
redis_cache_key = Pathname(expected_files.first).basename.to_s
in_memory_file = BawWorkers::IO.new_binary_string_io(capacity: 200_000)
end
Rails.logger.debug { { message: 'fast cache', should_check_redis_cache: should_check_redis_cache } }
poll_result = poll(wait_max, poll_delay) {
resque_status = BawWorkers::ActiveJob::Status::Persistance.get(job_id)
# check redis cache
if should_check_redis_cache
result = BawWorkers::Config.redis_communicator.get_file(redis_cache_key, in_memory_file)
# stop the poll loop and send back the file
next true if result&.positive?
end
existing_files = get_existing_files(poll_locations)
# return true if polling is complete, false to continue polling.
completed = false
completed = true if !resque_status.blank? && resque_status.completed?
completed = true unless existing_files.empty?
#Rails.logger.debug('polling data', existing_files: existing_files, resque_status: resque_status)
completed
}
# raise error if polling did not return a result
if poll_result[:result].nil?
resque_task_status = resque_status.nil? ? '(none)' : resque_status.status
msg = "Polling expired after #{wait_max} seconds with #{poll_delay} seconds delay with resque job status '#{resque_task_status}'. Could not find media files."
job_info = poll_result.merge({
uuid: resque_status.nil? ? nil : resque_status.job_id,
time: resque_status.nil? ? nil : resque_status.time,
status: resque_task_status,
poll_locations: poll_locations,
existing_files: existing_files
})
raise CustomErrors::AudioGenerationError.new(msg, job_info)
end
{
in_memory_file: in_memory_file&.string,
existing_files: existing_files,
resque_status: resque_status
}
end
# prepare list of directories and files to poll
# @param [Array<String>] files
# @return [Array<Hash>] valid files to poll
def prepare_locations(files)
poll_locations = []
regex_check = %r{\A(?:/?[0-9a-zA-Z_\-.]+)+\z}
files.each do |raw_file|
next if raw_file.nil?
next unless regex_check === raw_file
file = Pathname.new(raw_file).cleanpath
next if file.relative?
# this checks the filesystem, not just the string
#next unless file.file?
poll_locations.push(
{
dir: file.dirname.to_s,
file: file.to_s
}
)
end
poll_locations
end
def get_existing_files(poll_locations)
existing_files = []
poll_locations.each do |location|
#dir = location[:dir]
file = location[:file]
# get a valid directory path, and 'refresh' it by getting a file list with -l (executes stat() in linux).
# This helps avoid problems with nfs directory list caching.
# only list the file, as the dirs might have quite a few files
# could also use the external program runner
# run_ext_program.execute("ls -la \"#{dir}\"") if File.directory?(dir)
# can also be done by setting the attribute cache time for the nfs mount
# e.g. 'actimeo=3'
# @see NFS man page
# NEVER TURN THIS ON
#########system "ls -la \"#{dir}\""
# could try file instead, but not sure (would only ls file rather than the entire directory)
# not sure this would help, as need to stat() on dir, not file, to get the nfs cache to refresh?
######system "ls -la \"#{file}\""
# once one file exists, break out of this loop and return true
Rails.logger.debug "MediaPoll#get_existing_files: checking #{file}"
next unless File.exist?(file) && File.file?(file)
Rails.logger.debug "MediaPoll#get_existing_files: FOUND #{file}"
existing_files.push(file)
break
end
existing_files.reject(&:blank?)
end
private
# Based on Firepoll gem: for knowing when something is ready
# @param [Numeric] seconds number of seconds to poll, default is two seconds
# @param [Numeric] delay number of seconds to sleep, default is tenth of a second
# @yield a block that determines whether polling should continue
# @yield return false if polling should continue
# @yield return true if polling is complete
# @return the return value of the passed block
def poll(seconds = 2.0, delay = 0.1)
seconds ||= 2.0 # overall patience
poll_start_time = Time.now
give_up_at = poll_start_time + seconds # pick a time to stop being patient
delay ||= 0.1 # wait a tenth of a second before re-attempting
result_value = {
result: nil,
max_poll_time: seconds,
max_poll_datetime: give_up_at,
delay: delay
}
while Time.now < give_up_at
result = yield
if result
result_value[:result] = result
result_value[:actual_poll_time] = Time.now - poll_start_time
return result_value
end
sleep delay
end
result_value[:actual_poll_time] = Time.now - poll_start_time
result_value
end
end
end