QutBioacoustics/baw-workers

View on GitHub
lib/baw-workers/analysis/runner.rb

Summary

Maintainability
A
3 hrs
Test Coverage
module BawWorkers
  module Analysis

    # Run an Analysis action.
    class Runner

      # File name for config file per run.
      FILE_CONFIG = 'run.config'

      # File name for worker log file per run.
      FILE_LOG = 'worker.log'

      # Overall job success file
      FILE_SUCCESS = 'job.success'

      # Executable failed
      FILE_EXECUTABLE_FAILURE = 'job.analysis_failure'

      # Worker began processing job
      FILE_WORKER_STARTED = 'job.started'

      # directory name for programs that can be run
      DIR_PROGRAMS = 'programs'

      # directory containing files during run
      DIR_RUNS = 'runs'

      # temporary directory in each run directory
      DIR_TEMP = 'temp'

      # Create a new Support class.
      # @param [BawWorkers::Storage::AudioOriginal] original_store
      # @param [BawWorkers::Storage::AnalysisCache] analysis_cache
      # @param [Logger] logger
      # @param [String] dir_worker_top
      # @param [String] dir_programs
      # @return [BawWorkers::Analysis::Runner]
      def initialize(original_store, analysis_cache, logger, dir_worker_top, dir_programs)
        @original_store = original_store
        @analysis_cache = analysis_cache
        @logger = logger
        @dir_worker_top = dir_worker_top
        @dir_programs = dir_programs

        @class_name = self.class.name
      end

      # Prepare for a new run.
      # @param [Hash] opts
      # @return [Hash] settings for running worker
      def prepare(opts)
        BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Analysis::Payload::OPTS_FIELDS)

        opts[:datetime_with_offset] = BawWorkers::Validation.normalise_datetime(opts[:datetime_with_offset])

        # create started file in output dir
        dir_output = create_output_dir(opts)
        started_file = File.join(dir_output, FILE_WORKER_STARTED)
        FileUtils.touch(started_file)

        # create run dir and info
        run_info = create_run_info(opts)

        dir_run = run_info[:dir_run]
        dir_run_temp = run_info[:dir_run_temp]

        # copy programs directory to run dir
        dir_run_programs = copy_programs(dir_run)

        command_opts = {
            file_source: get_file_source(opts),
            file_executable: get_file_executable(dir_run_programs, opts),
            dir_output: dir_output,
            file_config: create_config_file(dir_run, opts),
            dir_run: dir_run,
            dir_temp: dir_run_temp
        }

        # format command string
        BawWorkers::Validation.check_custom_hash(command_opts, BawWorkers::Analysis::Payload::COMMAND_PLACEHOLDERS)
        BawWorkers::Analysis::Runner.check_command_format(opts)
        command_opts[:command] = BawWorkers::Analysis::Runner.format_command(opts, command_opts)

        # include path to worker log file
        command_opts[:file_run_log] = run_info[:file_run_log]

        command_opts
      end

      # Execute a command with working directory information.
      # @param [Hash] prepared_opts
      # @param [Hash] opts
      # @return [Hash] external program execution result hash
      def execute(prepared_opts, opts)
        BawWorkers::Validation.check_custom_hash(prepared_opts, BawWorkers::Analysis::Payload::COMMAND_PLACEHOLDERS)
        BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Analysis::Payload::OPTS_FIELDS)
        BawWorkers::Analysis::Runner.check_command_format(opts)

        timeout_sec = self.class.timeout_seconds
        log_file = prepared_opts[:file_run_log]
        dir_run = prepared_opts[:dir_run]
        dir_output = prepared_opts[:dir_output]
        command = prepared_opts[:command]

        current_wd = Dir.pwd
        run_log = Logger.new(log_file)
        logger = BawWorkers::MultiLogger.new(@logger, run_log)

        external_program = BawAudioTools::RunExternalProgram.new(timeout_sec, logger)
        error = nil
        result = {}

        begin

          logger.info(@class_name) { "Executing #{command}." }

          # change to run dir
          Dir.chdir(dir_run)
          result = external_program.execute(command, true)
        rescue => e
          error = e
          logger.error(@class_name) { "Error executing #{command}: #{e.inspect}." }
          result[:error] = error
        else
          # run if no error
          logger.debug(@class_name) { "Successfully executed #{result}." }
        ensure
          Dir.chdir(current_wd)
        end

        # add standard paths to copy
        opts[:copy_paths] = [] if opts[:copy_paths].blank?
        opts[:copy_paths] = [opts[:copy_paths]] unless opts[:copy_paths].respond_to?(:each)

        opts[:copy_paths].push(BawWorkers::Analysis::Runner::FILE_LOG)
        opts[:copy_paths].push(BawWorkers::Analysis::Runner::FILE_CONFIG)

        # copy files after command is executed
        result[:copy_results] = copy_custom(dir_run, dir_output, opts)

        # files created
        result[:output_files_created] = []

        # create result file
        if result.include?(:error) && !result[:error].blank?
          # create failure file
          executable_failure_file = File.join(dir_output, FILE_EXECUTABLE_FAILURE)
          FileUtils.touch(executable_failure_file)
          result[:output_files_created].push(FILE_EXECUTABLE_FAILURE)
        else
          # create success file
          success_file = File.join(dir_output, FILE_SUCCESS)
          FileUtils.touch(success_file)
          result[:output_files_created].push(FILE_SUCCESS)
        end

        # remove worker started file
        started_file = File.join(dir_output, FILE_WORKER_STARTED)
        File.delete(started_file) if File.exists?(started_file)

        # include command format
        result[:command_format] = opts[:command_format]

        # finally delete the run directory
        result[:dir_run] = dir_run
        delete_run_dir(dir_run)

        # include the output directory in results
        result[:dir_output] = dir_output

        result
      end

      # 4 hours
      def self.timeout_seconds
        4 * 60 * 60
      end

      # Create directory for a run.
      # @param [Hash] opts
      # @return [Hash] paths
      def create_run_info(opts)
        BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Analysis::Payload::OPTS_FIELDS)

        normalise_regex = /[^a-z0-9]/i
        current_time = Time.zone.now.utc.iso8601.to_s.downcase.gsub(normalise_regex, '_')

        dir_run = File.join(@dir_worker_top, BawWorkers::Analysis::Runner::DIR_RUNS, "#{opts[:job_id]}_#{opts[:id]}_#{current_time}")
        dir_run_temp = File.join(dir_run,  BawWorkers::Analysis::Runner::DIR_TEMP)
        file_run_log = File.join(dir_run, BawWorkers::Analysis::Runner::FILE_LOG)

        FileUtils.mkpath([dir_run, dir_run_temp])

        {
            dir_run: dir_run,
            dir_run_temp: dir_run_temp,
            file_run_log: file_run_log
        }
      end

      # Create directory for run results.
      # @param [Hash] opts
      # @return [String] output dir
      def create_output_dir(opts)
        BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Analysis::Payload::OPTS_FIELDS)

        analysis_store_opts = {
            job_id: opts[:job_id],
            uuid: opts[:uuid],
            sub_folders: []
        }

        dirs_output = @analysis_cache.possible_paths_dir(analysis_store_opts)

        dir_output = File.expand_path(BawWorkers::Validation.normalise_path(dirs_output.first, nil))

        FileUtils.mkpath([dir_output])

        dir_output
      end

      # Save config to file.
      # @param [String] dir_run
      # @param [Hash] opts
      # @return [String] config file path
      def create_config_file(dir_run, opts = {})
        BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Analysis::Payload::OPTS_FIELDS)

        config_content = opts[:config]
        config_file = File.join(dir_run, BawWorkers::Analysis::Runner::FILE_CONFIG)

        File.open(config_file, 'w') { |file| file.write(config_content) }

        config_file
      end

      # Get absolute path to source file.
      # @param [Hash] opts
      # @return [String] source file
      def get_file_source(opts)
        BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Analysis::Payload::OPTS_FIELDS)

        file_sources = @original_store.existing_paths(opts)

        if file_sources.empty?
          possible_sources = @original_store.possible_paths(opts)
          msg = "No original audio files found in #{possible_sources.join(', ')} using #{opts.to_json}."
          @logger.error(@class_name) { msg }
          fail BawAudioTools::Exceptions::AudioFileNotFoundError, msg
        end

        File.expand_path(BawWorkers::Validation.normalise_path(file_sources.first, nil))
      end

      # Get absolute path to executable.
      # @param [String] dir_run_programs
      # @param [Hash] opts
      # @return [String] executable path
      def get_file_executable(dir_run_programs, opts = {})
        BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Analysis::Payload::OPTS_FIELDS)

        file_executable_relative = opts[:file_executable]
        BawWorkers::Validation.normalise_path(file_executable_relative, dir_run_programs)
      end

      # Copy programs directory to run directory.
      # @param [String] dir_run
      # @return [String] programs dir for a run
      def copy_programs(dir_run)
        src = @dir_programs
        fail ArgumentError, "programs path does not exist #{src}" unless Dir.exists?(src)

        dest = BawWorkers::Validation.normalise_path(dir_run, @dir_worker_top)
        FileUtils.cp_r("#{src}", dest)

        dir_run_programs = File.join(dest, BawWorkers::Analysis::Runner::DIR_PROGRAMS)
        BawWorkers::Validation.normalise_path(dir_run_programs, @dir_worker_top)
      end

      # Copy custom paths to run dir
      # @param [String] dir_run
      # @param [String] dir_output
      # @param [Hash] opts
      # @return [Array<Hash>] copy results
      def copy_custom(dir_run, dir_output, opts = {})
        BawWorkers::Validation.check_custom_hash(opts, BawWorkers::Analysis::Payload::OPTS_FIELDS)

        copy_paths = opts[:copy_paths]
        copy_paths = [copy_paths] unless copy_paths.respond_to?(:each)

        copy_results = []

        copy_paths.each do |path|
          error = nil

          begin
            src = BawWorkers::Validation.normalise_path(path, dir_run)

            # copy file to top level of output dir
            dest_file_name = File.basename(path)
            dest = BawWorkers::Validation.normalise_path(dest_file_name, dir_output)
            FileUtils.cp(src, dest)
          rescue => e
            error = e
          end

          copy_results.push({error: error, source: src, destination: dest})
        end

        copy_results
      end

      # Delete the run directory
      # @param [String] run_dir
      # @return [void]
      def delete_run_dir(run_dir)
        # make sure the dir is underneath the runs dir
        runs_dir = File.join(@dir_worker_top, BawWorkers::Analysis::Runner::DIR_RUNS)
        fail ArgumentError, "dir must be in runs dir, given #{run_dir}" unless run_dir.start_with?(runs_dir)

        FileUtils.rm_rf(run_dir)
      end

      # Ensure command format has required placeholders
      # @param [Hash] opts
      # @return [void]
      def self.check_command_format(opts)
        command_format = opts[:command_format]

        # custom placeholders as don't want to accidentally allow sprintf too much power
        # can only contain placeholders for COMMAND_PLACEHOLDERS, but does not need to contain them all.

        # placeholder: <{PLACEHOLDER}>
        # find placeholders and remove surrounding chars
        command_placeholders = BawWorkers::Analysis::Runner.extract_command_placeholders(opts)
        allowed_placeholders = BawWorkers::Analysis::Payload::COMMAND_PLACEHOLDERS

        command_placeholders.each do |command_placeholder|
          unless allowed_placeholders.include?(command_placeholder)
            all_placeholders = allowed_placeholders.join(', ')
            fail ArgumentError, "Command #{command_format} can only contain #{all_placeholders}."
          end
        end

      end

      # Format command string.
      # @param [Hash] opts
      # @param [Hash] command_opts
      # @return [String] formatted command
      def self.format_command(opts, command_opts)
        command_format = opts[:command_format].dup

        command_placeholders = BawWorkers::Analysis::Runner.extract_command_placeholders(opts)
        allowed_placeholders = BawWorkers::Analysis::Payload::COMMAND_PLACEHOLDERS

        command_placeholders.each do |command_placeholder|

          if !command_opts.include?(command_placeholder) ||
              command_opts[command_placeholder].blank?
            fail ArgumentError, "Value not supplied for placeholder #{command_placeholder} in #{command_format}."
          end

          unless allowed_placeholders.include?(command_placeholder)
            fail ArgumentError, "Placeholder #{command_placeholder} is not allowed in #{command_format}."
          end

          placeholder_value = command_opts[command_placeholder]
          command_format.gsub!("<{#{command_placeholder}}>", placeholder_value)
        end

        command_format
      end

      private

      def self.extract_command_placeholders(opts)
        command_format = opts[:command_format]
        # placeholder: <{PLACEHOLDER}>
        # find placeholders and remove surrounding chars
        command_format.scan(/<{.*?}>/i).map { |placeholder| placeholder[2..-3].downcase.to_sym}
      end

    end
  end
end