BallAerospace/COSMOS

View on GitHub
cosmos/lib/cosmos/microservices/reducer_microservice.rb

Summary

Maintainability
C
1 day
Test Coverage
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder

require 'cosmos/microservices/microservice'
require 'cosmos/topics/topic'
require 'cosmos/packets/json_packet'
require 'cosmos/utilities/s3_file_cache'
require 'cosmos/models/reducer_model'
require 'rufus-scheduler'

module Cosmos
  class ReducerMicroservice < Microservice
    MINUTE_METRIC = 'reducer_minute_duration'
    HOUR_METRIC = 'reducer_hour_duration'
    DAY_METRIC = 'reducer_day_duration'

    # How long to wait for any currently running jobs to complete before killing them
    SHUTDOWN_DELAY_SECS = 5
    MINUTE_ENTRY_SECS = 60
    MINUTE_FILE_SECS = 3600
    HOUR_ENTRY_SECS = 3600
    HOUR_FILE_SECS = 3600 * 24
    DAY_ENTRY_SECS = 3600 * 24
    DAY_FILE_SECS = 3600 * 24 * 30

    # @param name [String] Microservice name formatted as <SCOPE>__REDUCER__<TARGET>
    #   where <SCOPE> and <TARGET> are variables representing the scope name and target name
    def initialize(name)
      super(name, is_plugin: false)
      @target_name = name.split('__')[-1]
      @packet_logs = {}
    end

    def run
      # Note it takes several seconds to create the scheduler
      @scheduler = Rufus::Scheduler.new
      # Run every minute
      @scheduler.cron '* * * * *', first: :now do
        reduce_minute
      end
      # Run every 15 minutes
      @scheduler.cron '*/15 * * * *', first: :now do
        reduce_hour
      end
      # Run hourly at minute 5 to allow the hour reducer to finish
      @scheduler.cron '5 * * * *', first: :now do
        reduce_day
      end

      # Let the current thread join the scheduler thread and
      # block until shutdown is called
      @scheduler.join
    end

    def shutdown
      @scheduler.shutdown(wait: SHUTDOWN_DELAY_SECS) if @scheduler

      # Make sure all the existing logs are properly closed down
      @packet_logs.each do |name, log|
        log.shutdown
      end
      super()
    end

    def metric(name)
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      yield
      elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
      @metric.add_sample(
        name: name,
        value: elapsed,
        labels: {
          'target' => @target_name,
        },
      )
    end

    def reduce_minute
      metric(MINUTE_METRIC) do
        ReducerModel
          .all_files(type: :DECOM, target: @target_name, scope: @scope)
          .each do |file|
            process_file(file, 'minute', MINUTE_ENTRY_SECS, MINUTE_FILE_SECS)
            ReducerModel.rm_file(file)
          end
      end
    end

    def reduce_hour
      metric(HOUR_METRIC) do
        ReducerModel
          .all_files(type: :MINUTE, target: @target_name, scope: @scope)
          .each do |file|
            process_file(file, 'hour', HOUR_ENTRY_SECS, HOUR_FILE_SECS)
            ReducerModel.rm_file(file)
          end
      end
    end

    def reduce_day
      metric(DAY_METRIC) do
        ReducerModel
          .all_files(type: :HOUR, target: @target_name, scope: @scope)
          .each do |file|
            process_file(file, 'day', DAY_ENTRY_SECS, DAY_FILE_SECS)
            ReducerModel.rm_file(file)
          end
      end
    end

    def process_file(filename, type, entry_seconds, file_seconds)
      file = S3File.new(filename)
      file.retrieve

      # Determine if we already have a PacketLogWriter created
      start_time, end_time, scope, target_name, packet_name, _ =
        filename.split('__')
      if @target_name != target_name
        raise "Target name in file #{filename} does not match microservice target name #{@target_name}"
      end
      plw = @packet_logs["#{scope}__#{target_name}__#{packet_name}__#{type}"]
      unless plw
        # Create a new PacketLogWriter for this reduced data
        # e.g. DEFAULT/reduced_minute_logs/tlm/INST/HEALTH_STATUS/20220101/
        # 20220101204857274290500__20220101205857276524900__DEFAULT__INST__HEALTH_STATUS__reduced__minute.bin
        remote_log_directory = "#{scope}/reduced_#{type}_logs/tlm/#{target_name}/#{packet_name}"
        rt_label = "#{scope}__#{target_name}__#{packet_name}__reduced__#{type}"
        plw = PacketLogWriter.new(remote_log_directory, rt_label)
        @packet_logs["#{scope}__#{target_name}__#{packet_name}__#{type}"] = plw
      end

      reduced = {}
      data_keys = nil
      entry_time = nil
      current_time = nil
      previous_time = nil
      plr = Cosmos::PacketLogReader.new
      plr.each(file.local_path) do |packet|
        # Ignore anything except numbers like STRING or BLOCK items
        data = packet.read_all(:RAW).select { |key, value| value.is_a?(Numeric) }
        converted_data = packet.read_all(:CONVERTED).select { |key, value| value.is_a?(Numeric) }
        # Merge in the converted data which overwrites the raw
        data.merge!(converted_data)

        previous_time = current_time
        current_time = packet.packet_time.to_f
        entry_time ||= current_time
        data_keys ||= data.keys

        # Determine if we've rolled over a entry boundary
        # We have to use current % entry_seconds < previous % entry_seconds because
        # we don't know the data rates. We also have to check for current - previous >= entry_seconds
        # in case the data rate is so slow we don't have multiple samples per entry
        if previous_time &&
             (
               (current_time % entry_seconds < previous_time % entry_seconds) ||
                 (current_time - previous_time >= entry_seconds)
             )
          Logger.debug("Reducer: Roll over entry boundary cur_time:#{current_time}")

          reduce(type, data_keys, reduced)
          plw.write(
            :JSON_PACKET,
            :TLM,
            target_name,
            packet_name,
            entry_time * Time::NSEC_PER_SECOND,
            false,
            JSON.generate(reduced.as_json),
          )
          # Reset all our sample variables
          entry_time = current_time
          reduced = {}

          # Check to see if we should start a new log file
          # We compare the current entry_time to see if it will push us over
          if plw.first_time &&
               (entry_time - plw.first_time.to_f) >= file_seconds
            Logger.debug("Reducer: (1) start new file! old filename: #{plw.filename}")
            plw.start_new_file # Automatically closes the current file
          end
        end

        # Update statistics for this packet's values
        data.each do |key, value|
          if type == 'minute'
            reduced["#{key}__VALS"] ||= []
            reduced["#{key}__VALS"] << value
            reduced["#{key}_MIN"] ||= value
            reduced["#{key}_MIN"] = value if value < reduced["#{key}_MIN"]
            reduced["#{key}_MAX"] ||= value
            reduced["#{key}_MAX"] = value if value > reduced["#{key}_MAX"]
          else
            reduced[key] ||= value
            reduced[key] = value if key.match(/_MIN$/) && value < reduced[key]
            reduced[key] = value if key.match(/_MAX$/) && value > reduced[key]
            if key.match(/_AVG$/)
              reduced["#{key}__VALS"] ||= []
              reduced["#{key}__VALS"] << value
            end
            if key.match(/_STDDEV$/)
              reduced["#{key}__VALS"] ||= []
              reduced["#{key}__VALS"] << value
            end
            if key.match(/_SAMPLES$/)
              reduced["#{key}__VALS"] ||= []
              reduced["#{key}__VALS"] << value
            end
          end
        end
      end
      file.delete # Remove the local copy

      # See if this last entry should go in a new file
      if plw.first_time &&
        (entry_time - plw.first_time.to_f) >= file_seconds
        Logger.debug("Reducer: (2) start new file! old filename: #{plw.filename}")
        plw.start_new_file # Automatically closes the current file
      end

      # Write out the final data now that the file is done
      reduce(type, data_keys, reduced)
      plw.write(
        :JSON_PACKET,
        :TLM,
        target_name,
        packet_name,
        entry_time * Time::NSEC_PER_SECOND,
        false,
        JSON.generate(reduced.as_json),
      )
      true
    rescue => e
      if file.local_path and File.exist?(file.local_path)
        Logger.error("Reducer Error: #{filename}:#{File.size(file.local_path)} bytes: \n#{e.formatted}")
      else
        Logger.error("Reducer Error: #{filename}:(Not Retrieved): \n#{e.formatted}")
      end
      false
    end

    def reduce(type, data_keys, reduced)
      # We've collected all the values so calculate the AVG and STDDEV
      if type == 'minute'
        data_keys.each do |key|
          reduced["#{key}_SAMPLES"] = reduced["#{key}__VALS"].length
          reduced["#{key}_AVG"], reduced["#{key}_STDDEV"] =
            Math.stddev_population(reduced["#{key}__VALS"])

          # Remove the raw values as they're only used for AVG / STDDEV calculation
          reduced.delete("#{key}__VALS")
        end
      else
        # Sort so we calculate the average first, then samples, then stddev
        data_keys.sort.each do |key|
          base_name = key.split('_')[0..-2].join('_')
          case key
          when /_AVG$/
            weighted_sum = 0
            samples = reduced["#{base_name}_SAMPLES__VALS"]
            reduced["#{key}__VALS"].each_with_index do |val, i|
              weighted_sum += (val * samples[i])
            end
            reduced[key] = weighted_sum / samples.sum
          when /_SAMPLES$/
            reduced[key] = reduced["#{base_name}_SAMPLES__VALS"].sum
          when /_STDDEV$/
            # Do the STDDEV calc last so we can use the previously calculated AVG
            # See https://math.stackexchange.com/questions/1547141/aggregating-standard-deviation-to-a-summary-point
            samples = reduced["#{base_name}_SAMPLES__VALS"]
            avg = reduced["#{base_name}_AVG__VALS"]
            s2 = 0
            reduced["#{key}__VALS"].each_with_index do |val, i|
              # puts "i:#{i} val:#{val} samples[i]:#{samples[i]} avg[i]:#{avg[i]}"
              s2 += (samples[i] * avg[i]**2 + val**2)
            end

            # Note: For very large numbers with very small deviations this sqrt can fail.
            # If so then just set the stddev to 0.
            begin
              reduced[key] =
                Math.sqrt(s2 / samples.sum - reduced["#{base_name}_AVG"])
            rescue Exception
              reduced[key] = 0.0
            end
          end
        end
        data_keys.each do |key|
          # Remove the raw values as they're only used for AVG / STDDEV calculation
          reduced.delete("#{key}__VALS")
        end
      end
    end
  end
end

Cosmos::ReducerMicroservice.run if __FILE__ == $0