QutBioacoustics/baw-server

View on GitHub
lib/gems/baw-workers/lib/baw_workers/jobs/analysis/runner.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# frozen_string_literal: true

module BawWorkers
  module Jobs
    module Analysis
      # Run an Analysis action.
      class Runner
        # File name for config file per run.
        FILE_CONFIG = 'run.config'

        # File name for worker log file per run.
        FILE_LOG = 'worker.log'

        # Overall job success file
        FILE_SUCCESS = 'job.success'

        # Executable failed
        FILE_EXECUTABLE_FAILURE = 'job.analysis_failure'

        # Worker began processing job
        FILE_WORKER_STARTED = 'job.started'

        # directory name for programs that can be run
        DIR_PROGRAMS = 'programs'

        # directory containing files during run
        DIR_RUNS = 'runs'

        # temporary directory in each run directory
        DIR_TEMP = 'temp'

        # Create a new Support class.
        # @param [BawWorkers::Storage::AudioOriginal] original_store
        # @param [BawWorkers::Storage::AnalysisCache] analysis_cache
        # @param [Logger] logger
        # @param [String] dir_worker_top
        # @param [String] dir_programs
        # @return [BawWorkers::Jobs::Analysis::Runner]
        def initialize(original_store, analysis_cache, logger, dir_worker_top, dir_programs)
          @original_store = original_store
          @analysis_cache = analysis_cache
          @logger = logger
          @dir_worker_top = dir_worker_top
          @dir_programs = dir_programs

          @class_name = self.class.name
        end

        # Prepare for a new run.
        # @param [Hash] opts
        # @return [Hash] settings for running worker
        def prepare(opts)
          BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Jobs::Analysis::Payload::OPTS_FIELDS)

          opts[:datetime_with_offset] = BawWorkers::Validation.normalise_datetime(opts[:datetime_with_offset])

          # create started file in output dir
          dir_output = create_output_dir(opts)
          started_file = File.join(dir_output, FILE_WORKER_STARTED)
          FileUtils.touch(started_file)

          # create run dir and info
          run_info = create_run_info(opts)

          dir_run = run_info[:dir_run]
          dir_run_temp = run_info[:dir_run_temp]

          # copy programs directory to run dir
          dir_run_programs = copy_programs(dir_run)

          command_opts = {
            file_source: get_file_source(opts),
            file_executable: get_file_executable(dir_run_programs, opts),
            dir_output: dir_output,
            file_config: create_config_file(dir_run, opts),
            dir_run: dir_run,
            dir_temp: dir_run_temp
          }

          # format command string
          BawWorkers::Validation.check_custom_hash(command_opts,
                                                   BawWorkers::Jobs::Analysis::Payload::COMMAND_PLACEHOLDERS)
          BawWorkers::Jobs::Analysis::Runner.check_command_format(opts)
          command_opts[:command] = BawWorkers::Jobs::Analysis::Runner.format_command(opts, command_opts)

          # include path to worker log file
          command_opts[:file_run_log] = run_info[:file_run_log]

          command_opts
        end

        # Execute a command with working directory information.
        # @param [Hash] prepared_opts
        # @param [Hash] opts
        # @return [Hash] external program execution result hash
        def execute(prepared_opts, opts)
          BawWorkers::Validation.check_custom_hash(prepared_opts,
                                                   BawWorkers::Jobs::Analysis::Payload::COMMAND_PLACEHOLDERS)
          BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Jobs::Analysis::Payload::OPTS_FIELDS)
          BawWorkers::Jobs::Analysis::Runner.check_command_format(opts)

          timeout_sec = self.class.timeout_seconds
          log_file = prepared_opts[:file_run_log]
          dir_run = prepared_opts[:dir_run]
          dir_output = prepared_opts[:dir_output]
          command = prepared_opts[:command]

          current_wd = Dir.pwd
          run_log = Logger.new(log_file)
          logger = BawWorkers::MultiLogger.new(@logger, run_log)

          external_program = BawAudioTools::RunExternalProgram.new(timeout_sec, logger)
          error = nil
          result = {}

          begin
            logger.info(@class_name) do "Executing #{command}." end

            # change to run dir
            Dir.chdir(dir_run)
            result = external_program.execute(command, true)
          rescue StandardError => e
            error = e
            logger.error(@class_name) do "Error executing #{command}: #{e.inspect}." end
            result[:error] = error
          else
            # run if no error
            logger.debug(@class_name) { "Successfully executed #{result}." }
          ensure
            Dir.chdir(current_wd)
          end

          # add standard paths to copy
          opts[:copy_paths] = [] if opts[:copy_paths].blank?
          opts[:copy_paths] = [opts[:copy_paths]] unless opts[:copy_paths].respond_to?(:each)

          opts[:copy_paths].push(BawWorkers::Jobs::Analysis::Runner::FILE_LOG)
          opts[:copy_paths].push(BawWorkers::Jobs::Analysis::Runner::FILE_CONFIG)

          # copy files after command is executed
          result[:copy_results] = copy_custom(dir_run, dir_output, opts)

          # files created
          result[:output_files_created] = []

          # create result file
          if result.include?(:error) && !result[:error].blank?
            # create failure file
            executable_failure_file = File.join(dir_output, FILE_EXECUTABLE_FAILURE)
            FileUtils.touch(executable_failure_file)
            result[:output_files_created].push(FILE_EXECUTABLE_FAILURE)
          else
            # create success file
            success_file = File.join(dir_output, FILE_SUCCESS)
            FileUtils.touch(success_file)
            result[:output_files_created].push(FILE_SUCCESS)
          end

          # remove worker started file
          started_file = File.join(dir_output, FILE_WORKER_STARTED)
          File.delete(started_file) if File.exist?(started_file)

          # include command format
          result[:command_format] = opts[:command_format]

          # finally delete the run directory
          result[:dir_run] = dir_run
          delete_run_dir(dir_run)

          # include the output directory in results
          result[:dir_output] = dir_output

          result
        end

        # 4 hours
        def self.timeout_seconds
          4 * 60 * 60
        end

        # Create directory for a run.
        # @param [Hash] opts
        # @return [Hash] paths
        def create_run_info(opts)
          BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Jobs::Analysis::Payload::OPTS_FIELDS)

          normalise_regex = /[^a-z0-9]/i
          current_time = Time.zone.now.utc.iso8601.to_s.downcase.gsub(normalise_regex, '_')

          dir_run = File.join(@dir_worker_top, BawWorkers::Jobs::Analysis::Runner::DIR_RUNS,
                              "#{opts[:job_id]}_#{opts[:id]}_#{current_time}")
          dir_run_temp = File.join(dir_run, BawWorkers::Jobs::Analysis::Runner::DIR_TEMP)
          file_run_log = File.join(dir_run, BawWorkers::Jobs::Analysis::Runner::FILE_LOG)

          FileUtils.mkpath([dir_run, dir_run_temp])

          {
            dir_run: dir_run,
            dir_run_temp: dir_run_temp,
            file_run_log: file_run_log
          }
        end

        # Create directory for run results.
        # @param [Hash] opts
        # @return [String] output dir
        def create_output_dir(opts)
          BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Jobs::Analysis::Payload::OPTS_FIELDS)

          analysis_store_opts = {
            job_id: opts[:job_id],
            uuid: opts[:uuid],
            sub_folders: []
          }

          dirs_output = @analysis_cache.possible_paths_dir(analysis_store_opts)

          dir_output = File.expand_path(BawWorkers::Validation.normalise_path(dirs_output.first, nil))

          FileUtils.mkpath([dir_output])

          dir_output
        end

        # Save config to file.
        # @param [String] dir_run
        # @param [Hash] opts
        # @return [String] config file path
        def create_config_file(dir_run, opts = {})
          BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Jobs::Analysis::Payload::OPTS_FIELDS)

          config_content = opts[:config]
          config_file = File.join(dir_run, BawWorkers::Jobs::Analysis::Runner::FILE_CONFIG)

          File.open(config_file, 'w') do |file| file.write(config_content) end

          config_file
        end

        # Get absolute path to source file.
        # @param [Hash] opts
        # @return [String] source file
        def get_file_source(opts)
          BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Jobs::Analysis::Payload::OPTS_FIELDS)

          file_sources = @original_store.existing_paths(opts)

          if file_sources.empty?
            possible_sources = @original_store.possible_paths(opts)
            msg = "No original audio files found in #{possible_sources.join(', ')} using #{opts.to_json}."
            @logger.error(@class_name) do msg end
            raise BawAudioTools::Exceptions::AudioFileNotFoundError, msg
          end

          File.expand_path(BawWorkers::Validation.normalise_path(file_sources.last, nil))
        end

        # Get absolute path to executable.
        # @param [String] dir_run_programs
        # @param [Hash] opts
        # @return [String] executable path
        def get_file_executable(dir_run_programs, opts = {})
          BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Jobs::Analysis::Payload::OPTS_FIELDS)

          file_executable_relative = opts[:file_executable]
          BawWorkers::Validation.normalise_path(file_executable_relative, dir_run_programs)
        end

        # Copy programs directory to run directory.
        # @param [String] dir_run
        # @return [String] programs dir for a run
        def copy_programs(dir_run)
          src = @dir_programs
          raise ArgumentError, "programs path does not exist #{src}" unless Dir.exist?(src)

          dest = BawWorkers::Validation.normalise_path(dir_run, @dir_worker_top)
          FileUtils.cp_r(src.to_s, dest)

          dir_run_programs = File.join(dest, BawWorkers::Jobs::Analysis::Runner::DIR_PROGRAMS)
          BawWorkers::Validation.normalise_path(dir_run_programs, @dir_worker_top)
        end

        # Copy custom paths to run dir
        # @param [String] dir_run
        # @param [String] dir_output
        # @param [Hash] opts
        # @return [Array<Hash>] copy results
        def copy_custom(dir_run, dir_output, opts = {})
          BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Jobs::Analysis::Payload::OPTS_FIELDS)

          copy_paths = opts[:copy_paths]
          copy_paths = [copy_paths] unless copy_paths.respond_to?(:each)

          copy_results = []

          copy_paths.each do |path|
            error = nil

            begin
              src = BawWorkers::Validation.normalise_path(path, dir_run)

              # copy file to top level of output dir
              dest_file_name = File.basename(path)
              dest = BawWorkers::Validation.normalise_path(dest_file_name, dir_output)
              FileUtils.cp(src, dest)
            rescue StandardError => e
              error = e
            end

            copy_results.push(error: error, source: src, destination: dest)
          end

          copy_results
        end

        # Delete the run directory
        # @param [String] run_dir
        # @return [void]
        def delete_run_dir(run_dir)
          # make sure the dir is underneath the runs dir
          runs_dir = File.join(@dir_worker_top, BawWorkers::Jobs::Analysis::Runner::DIR_RUNS)
          raise ArgumentError, "dir must be in runs dir, given #{run_dir}" unless run_dir.start_with?(runs_dir)

          FileUtils.rm_rf(run_dir)
        end

        # Ensure command format has required placeholders
        # @param [Hash] opts
        # @return [void]
        def self.check_command_format(opts)
          command_format = opts[:command_format]

          # custom placeholders as don't want to accidentally allow sprintf too much power
          # can only contain placeholders for COMMAND_PLACEHOLDERS, but does not need to contain them all.

          # placeholder: <{PLACEHOLDER}>
          # find placeholders and remove surrounding chars
          command_placeholders = BawWorkers::Jobs::Analysis::Runner.extract_command_placeholders(opts)
          allowed_placeholders = BawWorkers::Jobs::Analysis::Payload::COMMAND_PLACEHOLDERS

          command_placeholders.each do |command_placeholder|
            unless allowed_placeholders.include?(command_placeholder)
              all_placeholders = allowed_placeholders.join(', ')
              raise ArgumentError, "Command #{command_format} can only contain #{all_placeholders}."
            end
          end
        end

        # Format command string.
        # @param [Hash] opts
        # @param [Hash] command_opts
        # @return [String] formatted command
        def self.format_command(opts, command_opts)
          command_format = opts[:command_format].dup

          command_placeholders = BawWorkers::Jobs::Analysis::Runner.extract_command_placeholders(opts)
          allowed_placeholders = BawWorkers::Jobs::Analysis::Payload::COMMAND_PLACEHOLDERS

          command_placeholders.each do |command_placeholder|
            if !command_opts.include?(command_placeholder) ||
               command_opts[command_placeholder].blank?
              raise ArgumentError, "Value not supplied for placeholder #{command_placeholder} in #{command_format}."
            end

            unless allowed_placeholders.include?(command_placeholder)
              raise ArgumentError, "Placeholder #{command_placeholder} is not allowed in #{command_format}."
            end

            placeholder_value = command_opts[command_placeholder]
            command_format.gsub!("<{#{command_placeholder}}>", placeholder_value)
          end

          command_format
        end

        def self.extract_command_placeholders(opts)
          command_format = opts[:command_format]
          # placeholder: <{PLACEHOLDER}>
          # find placeholders and remove surrounding chars
          command_format.scan(/<{.*?}>/i).map { |placeholder| placeholder[2..-3].downcase.to_sym }
        end
      end
    end
  end
end