QutBioacoustics/baw-workers

View on GitHub
lib/baw-workers/analysis/status.rb

Summary

Maintainability
A
1 hr
Test Coverage
module BawWorkers
  module Analysis

    # Integrates an Analysis action with our tracking systems
    # The broader resque:status concerns are handled higher up the hierarchy
    class Status

      def initialize(api_communicator)

        @api_communicator = api_communicator

        # sign into the website
        @security_info = Status.get_security_info(@api_communicator)
      end

      # @param [Hash] params
      def begin(params)
        # is system job? then ignore - we have no status tracking
        return if should_not_process?(params)

        analysis_job_id = params[:job_id]
        audio_recording_id = params[:id]

        # check if job has been killed by website tracking
        cancel_result = @api_communicator.get_analysis_jobs_item_status(
            analysis_job_id,
            audio_recording_id,
            @security_info)
        cancelled_by_website = cancel_result[:status].nil? ? false : cancel_result[:status].to_sym == :cancelling


        # if it has been cancelled
        if cancelled_by_website
          # raise an action cancelled exception - it will be caught by action_perform
          BawWorkers::Config.logger_worker.warn(self.class.name) {
            "The website cancelled this analysis job"
          }
          raise BawWorkers::Exceptions::ActionCancelledError.new(cancel_result[:response_json])
        end

        working_json = @api_communicator.update_analysis_jobs_item_status(
            analysis_job_id,
            audio_recording_id,
            :working,
            @security_info)

        if working_json[:failed]
          raise AnalysisEndpointError.new(working_json[:response_json])
        end
      end


      # Update the tracking system. At this point the status is either cancelled, timed_out, successful, or failed
      # @param [Symbol] status
      # @param [Hash] params
      def end(params, status)
        # is system job? then ignore - we have no status tracking
        return if should_not_process?(params)

        analysis_job_id = params[:job_id]
        audio_recording_id = params[:id]

        # NB: no need to update resque status.
        # The `Resque:Plugins:Status::safe_perform!` method handles all of it's updates

        retry_attempts = self.class.retry_attempts
        attempts_left = retry_attempts
        failed = false
        while attempts_left > 0
          # = 0.0, ~1.718, ~6.389, ~19.085
          back_off = Math.exp(retry_attempts - attempts_left) - 1
          sleep(back_off)

          # update website with desired status
          # the API communicator heavily logs what is is doing
          begin
            result = @api_communicator.update_analysis_jobs_item_status(analysis_job_id, audio_recording_id, status, @security_info)
          rescue Timeout::Error => te
            # yeah we're squashing :-/ but this is all error handling code...
            failed = true
          end

          if failed || result[:failed]
            attempts_left = attempts_left - 1
            @api_communicator.logger.warn(self.class.name) {
              "AnalysisJobItem status update failed, trying again, #{attempts_left} attempts left"
            }
          else
            attempts_left = -1
          end
        end

        # the web request has failed multiple times
        if attempts_left == 0
          self.class.mail_error(params, status)
        end
      end

      private

      def self.retry_attempts
        4
      end

      def self.get_security_info(api_communicator)
        security_info = api_communicator.request_login

        if security_info.blank?
          msg = 'Could not log in.'
          @logger.error(@class_name) { msg }
          fail BawWorkers::Exceptions::AnalysisEndpointError, msg
        end

        security_info
      rescue => e
        self.class.mail_error(nil, nil, e)
        raise e
      end

      def should_not_process?(params)
        !params || params[:job_id].to_s.strip.downcase == BawWorkers::Storage::AnalysisCache::JOB_ID_SYSTEM
      end

      def self.mail_error(params, status, e = nil)
        BawWorkers::Mail::Mailer.send_worker_error_email(
            BawWorkers::Analysis::Status,
            {params: params, status: status},
            BawWorkers::Analysis::Action::queue,
            e || StandardError.new("Could not update AnalysisJobsItems status")
        )
      end
    end
  end
end