QutBioacoustics/baw-server

View on GitHub
lib/gems/baw-workers/lib/baw_workers/config.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# frozen_string_literal: true

module BawWorkers
  class Config
    class << self
      @is_resque_worker = false
      # Is this a resque_worker?
      # @return [Boolean]
      def resque_worker?
        @is_resque_worker
      end

      @is_scheduler = false
      # Is this a scheduler?
      # @return [Boolean]
      def scheduler?
        @is_scheduler
      end

      @is_baw_workers_entry = false
      # Was this process started by the baw-workers executable?
      # @return [Boolean]
      def baw_workers_entry?
        @is_baw_workers_entry
      end

      # @return [::SemanticLogger::Logger]
      attr_reader :logger_worker

      # @return [::SemanticLogger::Logger]
      attr_reader :logger_mailer

      # @return [::SemanticLogger::Logger]
      attr_reader :logger_audio_tools

      # @return [Pathname]
      attr_reader :temp_dir

      # @return [Pathname]
      attr_reader :worker_top_dir

      # @return [Pathname]
      attr_reader :programs_dir

      # @return [BawAudioTools::Spectrogram]
      attr_reader :spectrogram_helper

      # @return [BawAudioTools::AudioBase]
      attr_reader :audio_helper

      # @return [BawWorkers::Storage::AudioOriginal]
      attr_reader :original_audio_helper

      # @return [BawWorkers::Storage::AudioCache]
      attr_reader :audio_cache_helper

      # @return [BawWorkers::Storage::SpectrogramCache]
      attr_reader :spectrogram_cache_helper

      # @return [BawWorkers::Storage::AnalysisCache]
      attr_reader :analysis_cache_helper

      # @return [BawWorkers::FileInfo]
      attr_reader :file_info

      # @return [BawWorkers::ApiCommunicator]
      attr_reader :api_communicator

      # @return [BawWorkers::RedisCommunicator]
      attr_reader :redis_communicator

      # @return [BawWorkers::UploadService::Communicator]
      attr_reader :upload_communicator

      # Adjust initialization context when started from rake task
      # @param [Boolean] :resque_worker (false) are we running in the context of a Resque worker?
      def set(is_resque_worker: false, is_scheduler: false)
        @is_resque_worker = !is_resque_worker.nil?
        @is_scheduler = is_scheduler == true
        @is_baw_workers_entry = true
      end

      # Configure the workers
      # @param [SemanticLogger::Logger] the base log to work with
      # @param [Config::Options] the settings to use
      def run_web(core_logger, settings)
        # assert settings is a singleton
        raise StandardError, 'run_web: Settings should have already been initialized' if settings.nil?
        raise StandardError, 'run_web: Settings were nil but should be defined' if Settings.nil?
        raise StandardError, 'run_web:  Settings should be identical to Settings' if settings != Settings

        # configure basic attributes first
        configure_paths(settings, true)
        configure_storage(settings)

        # configure logging
        configure_loggers(core_logger, settings)

        configure_upload_service(settings)

        # configure Resque
        configure_redis(settings)
        configure_resque(settings)

        # configure mailer
        configure_mailer(settings)

        # configure complex attributes
        configure_audio_helper(settings)
        configure_spectrogram_helper(settings)
        configure_api_communicator(settings)
        @file_info = BawWorkers::FileInfo.new(BawWorkers::Config.audio_helper)

        # configure resque worker
        configure_resque_worker if resque_worker?
        configure_scheduler if scheduler?

        result = format_result(settings)
        result = format_resque_worker(result, settings)

        check_resque_formatter

        log_info result
      end

      private

      def baw_workers_mode(settings)
        return :library unless baw_workers_entry?

        return :resque_foreground if settings.resque.background_pid_file.blank?

        :resque_background
      end

      # Configures redis connections for both Resque and our own Redis wrapper
      def configure_redis(settings)
        communicator_redis = Redis.new(Settings.redis.connection.to_h)

        # Set up standard redis wrapper.
        @redis_communicator = BawWorkers::RedisCommunicator.new(
          SemanticLogger[RedisCommunicator],
          communicator_redis,
          # options go here if defined
          {
            namespace: settings.redis.namespace
          }
        )
      end

      def configure_paths(settings, default_used)
        @temp_dir = settings.paths.temp_dir
        @worker_top_dir = default_used ? BawWorkers::Config.temp_dir : Pathname(Fle.dirname(settings_files.last))
        @programs_dir = Settings.paths.programs_dir
      end

      def configure_storage(settings)
        @original_audio_helper = BawWorkers::Storage::AudioOriginal.new(
          settings.paths.original_audios
        )
        @audio_cache_helper = BawWorkers::Storage::AudioCache.new(
          settings.paths.cached_audios
        )
        @spectrogram_cache_helper = BawWorkers::Storage::SpectrogramCache.new(
          settings.paths.cached_spectrograms
        )
        @analysis_cache_helper = BawWorkers::Storage::AnalysisCache.new(
          settings.paths.cached_analysis_jobs
        )
      end

      def configure_upload_service(settings)
        @upload_communicator = BawWorkers::UploadService::Communicator.new(
          config: settings.upload_service,
          logger: BawWorkers::Config.logger_worker
        )
      end

      def configure_loggers(_core_logger, settings)
        @logger_worker = SemanticLogger['BawWorkers']
        @logger_mailer = SemanticLogger['BawWorkers::Mailer']
        @logger_audio_tools = SemanticLogger['BawAudioTools']

        # set log levels from settings file
        #BawWorkers::Config.logger_worker.level
        BawWorkers::Config.logger_mailer.level = settings.mailer.log_level.constantize
        BawWorkers::Config.logger_audio_tools.level = settings.audio_tools.log_level.constantize
      end

      def configure_mailer(_settings)
        # All settings should be set by rails
        unless BawWorkers::Mail::Mailer.logger == BawWorkers::Config.logger_mailer
          raise 'BawWorkers::Mail::Mailer logger incorrect'
        end
      end

      def configure_audio_helper(settings)
        @audio_helper = BawAudioTools::AudioBase.from_executables(
          settings.cached_audio_defaults,
          BawWorkers::Config.logger_audio_tools,
          BawWorkers::Config.temp_dir,
          settings.audio_tools_timeout_sec,
          ffmpeg: settings.audio_tools.ffmpeg_executable,
          ffprobe: settings.audio_tools.ffprobe_executable,
          mp3splt: settings.audio_tools.mp3splt_executable,
          sox: settings.audio_tools.sox_executable,
          wavpack: settings.audio_tools.wavpack_executable,
          shntool: settings.audio_tools.shntool_executable
        )
      end

      def configure_spectrogram_helper(settings)
        @spectrogram_helper = BawAudioTools::Spectrogram.from_executables(
          BawWorkers::Config.audio_helper,
          settings.audio_tools.imagemagick_convert_executable,
          settings.audio_tools.imagemagick_identify_executable,
          settings.cached_spectrogram_defaults,
          BawWorkers::Config.temp_dir
        )
      end

      def configure_api_communicator(settings)
        @api_communicator = BawWorkers::ApiCommunicator.new(
          BawWorkers::Config.logger_worker,
          settings.api,
          settings.endpoints
        )
      end

      def configure_resque(settings)
        Resque.redis = ActiveSupport::HashWithIndifferentAccess.new(settings.resque.connection)
        Resque.redis.namespace = Settings.resque.namespace
        BawWorkers::ActiveJob::Status::Persistance.configure(Resque.redis.redis)

        # Logger set automatically by SemanticLogger RailTie
        raise 'Resque logger not configured' unless Resque.logger.is_a?(SemanticLogger::Logger)

        Resque.logger.level = settings.resque.log_level.constantize
      end

      def configure_resque_worker
        if Settings.resque.background_pid_file.blank?
          ENV['PIDFILE'] = nil
          ENV['BACKGROUND'] = nil
        else
          ENV['PIDFILE'] = Settings.resque.background_pid_file
          ENV['BACKGROUND'] = 'yes'
        end

        ENV['QUEUES'] = Settings.resque.queues_to_process.join(',')
        ENV['INTERVAL'] = Settings.resque.polling_interval_seconds.to_s

        # ensure resque does not kill workers with exit!
        # Note: do not set this, it will cause failures in retry_on
        #ENV['RUN_AT_EXIT_HOOKS'] = 'true'
        # use new killer, it's a little bit graceful
        ENV['GRACEFUL_TERM'] = 'true'
        ENV['TERM_CHILD'] = 'true'
        # timeout for graceful death
        ENV['RESQUE_TERM_TIMEOUT'] = '10.0'

        # if BawApp.dev_or_test?
        #   Resque.after_fork do
        #     at_exit do
        #       pid = $PROCESS_ID
        #       tid = Thread.current.object_id
        #       error = $ERROR_INFO
        #       puts <<~MSG
        #         @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
        #         Worker exited! PID: #{pid} TID: #{tid}
        #         Caller: #{caller}
        #         Error: #{error}
        #         @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
        #       MSG
        #       exit!
        #     end
        #   end
        # end
      end

      def configure_scheduler
        Resque::Scheduler.configure do |c|
          c.poll_sleep_amount = Settings.resque.polling_interval_seconds
        end
        # If you want to be able to dynamically change the schedule,
        # uncomment this line.  A dynamic schedule can be updated via the
        # Resque::Scheduler.set_schedule (and remove_schedule) methods.
        # When dynamic is set to true, the scheduler process looks for
        # schedule changes and applies them on the fly.
        # Note: This feature is only available in >=2.0.0.
        #
        # We don't use schedules, but if we do, it will be dynamic
        Resque::Scheduler.dynamic = true
        Resque::Scheduler.logger = SemanticLogger['Resque::Scheduler']
        Resque::Scheduler.logger.level = Settings.resque_scheduler.log_level.constantize
      end

      def check_resque_formatter
        # temporary hack - v1.26 of Resque overrides our default formatter.
        # This is the fix for the bug https://github.com/resque/resque/commit/eaaac2acc209456cdd0dd794d2d3714968cf76e4
        # This is a new behaviour that I can't replicate in a dev environment - which
        # I now suspect is because we call .verbose somewhere.
        # The formatter resque uses is the QuietFormatter and it literally just
        # writes out an empty string whenever a log statement is run. As near as I can tell this overwrite happens
        # either in Resque.info or one of the other Resque related functions in the result block above.
        # It is also happens when workers are created which means patching the formatter below wouldn't work properly.
        # Now instead of patching, just don't even start - fail fast!
        return unless Resque.logger.formatter.is_a?(Resque::QuietFormatter)

        warn 'WARNING: Resque overwrote the default formatter!'

        # when this is no longer an issue remove exception in lib/gems/baw-workers/lib/baw_workers/multi_logger.rb#formatter=

        BawWorkers::Config.logger_worker.formatter = BawWorkers::MultiLogger::CustomFormatter.new
      end

      def format_result(settings)
        {
          context: resque_worker? ? 'worker' : 'rails',
          baw_workers_entry: baw_workers_entry?,
          scheduler: scheduler?,
          settings: {
            test: BawApp.test?,
            environment: BawApp.env.to_s,
            files: settings.sources,
            run_dir: BawWorkers::Config.worker_top_dir,
            is_development: BawApp.development?
          },
          redis: {
            namespace: Resque.redis.namespace.to_s,
            connection: Resque.redis.connection,
            info: Resque.info
          },
          active_job: {
            status: {
              expire_in: BawWorkers::ActiveJob::Status::Persistance.expire_values
            }
          },
          logging: {
            worker: BawWorkers::Config.logger_worker.level,
            mailer: BawWorkers::Config.logger_mailer.level,
            audio_tools: BawWorkers::Config.logger_audio_tools.level,
            resque: Resque.logger.level
          }
        }
      end

      def format_resque_worker(result, settings)
        is_resque_worker = resque_worker?
        result[:resque_worker] = {
          running: is_resque_worker,
          mode: baw_workers_mode(settings),
          pid_file: is_resque_worker ? ENV.fetch('PIDFILE', nil) : nil,
          queues: is_resque_worker ? ENV.fetch('QUEUES', nil) : nil,
          poll_interval: is_resque_worker ? ENV['INTERVAL'].to_f : nil
        }
        result
      end

      def log_info(result)
        BawWorkers::Config.logger_worker.warn('BawWorkers::Config') {
          result
        }
      end
    end
  end
end