QutBioacoustics/baw-server

View on GitHub
lib/gems/baw-workers/lib/baw_workers/jobs/analysis/status.rb

Summary

Maintainability
A
1 hr
Test Coverage
# frozen_string_literal: true

module BawWorkers
  module Jobs
    module Analysis
      # Integrates an Analysis action with our tracking systems
      # The broader resque:status concerns are handled higher up the hierarchy
      class Status
        def initialize(api_communicator)
          @api_communicator = api_communicator

          # sign into the website
          @security_info = get_security_info
        end

        # @param [Hash] params
        def begin(params)
          # is system job? then ignore - we have no status tracking
          return if should_not_process?(params)

          analysis_job_id = params[:job_id]
          audio_recording_id = params[:id]

          # check if job has been killed by website tracking
          cancel_result = try_update('Could not check AnalysisJobsItems status.') {
            @api_communicator.get_analysis_jobs_item_status(
              analysis_job_id,
              audio_recording_id,
              @security_info
            )
          }
          cancelled_by_website = cancel_result[:status].nil? ? false : cancel_result[:status].to_sym == :cancelling

          # if it has been cancelled
          if cancelled_by_website
            # raise an action cancelled exception - it will be caught by action_perform
            BawWorkers::Config.logger_worker.warn(self.class.name) do
              'The website cancelled this analysis job'
            end
            raise BawWorkers::Exceptions::ActionCancelledError, cancel_result[:response_json]
          end

          working_json = try_update('Could not update AnalysisJobsItems status to :working.') {
            @api_communicator.update_analysis_jobs_item_status(
              analysis_job_id,
              audio_recording_id,
              :working,
              @security_info
            )
          }

          raise  BawWorkers::Exceptions::AnalysisEndpointError, working_json[:response] if working_json[:failed]
        end

        # Update the tracking system. At this point the status is either cancelled, timed_out, successful, or failed
        # @param [Symbol] status
        # @param [Hash] params
        def end(params, status)
          # is system job? then ignore - we have no status tracking
          return if should_not_process?(params)

          analysis_job_id = params[:job_id]
          audio_recording_id = params[:id]

          # NB: no need to update redis status. It does it automatically

          try_update("Could not update AnalysisJobsItems status to #{status}.") do
            @api_communicator.update_analysis_jobs_item_status(analysis_job_id, audio_recording_id, status,
              @security_info)
          end
        end

        def self.retry_attempts
          4
        end

        def try_update(*context)
          retry_attempts = self.class.retry_attempts
          attempts_left = retry_attempts
          failed = false
          errors = []
          while attempts_left.positive?
            # = 0.0, ~1.718, ~6.389, ~19.085
            back_off = Math.exp(retry_attempts - attempts_left) - 1
            sleep(back_off)

            # update website with desired status
            # the API communicator heavily logs what is is doing
            result = nil
            begin
              result = yield
            rescue StandardError => e
              errors << e
              failed = true
            end

            return result unless failed || result[:failed]

            attempts_left -= 1
            @api_communicator.logger.warn(self.class.name) {
              "AnalysisJobItem status update failed, trying again, #{attempts_left} attempts left"
            }
          end

          # the web request has failed multiple times
          error = BawWorkers::Exceptions::AnalysisEndpointError.new("#{context}\n#{errors.map(&:message).join("\n")}")
          raise error
        end

        def get_security_info
          security_info = try_update('Could not log in.') {
            @api_communicator.request_login
          }
        end

        def should_not_process?(params)
          !params || params[:job_id].to_s.strip.downcase == BawWorkers::Storage::AnalysisCache::JOB_ID_SYSTEM
        end
      end
    end
  end
end