cosmos/lib/cosmos/microservices/reducer_microservice.rb
# encoding: ascii-8bit
# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder
require 'cosmos/microservices/microservice'
require 'cosmos/topics/topic'
require 'cosmos/packets/json_packet'
require 'cosmos/utilities/s3_file_cache'
require 'cosmos/models/reducer_model'
require 'rufus-scheduler'
module Cosmos
class ReducerMicroservice < Microservice
MINUTE_METRIC = 'reducer_minute_duration'
HOUR_METRIC = 'reducer_hour_duration'
DAY_METRIC = 'reducer_day_duration'
# How long to wait for any currently running jobs to complete before killing them
SHUTDOWN_DELAY_SECS = 5
MINUTE_ENTRY_SECS = 60
MINUTE_FILE_SECS = 3600
HOUR_ENTRY_SECS = 3600
HOUR_FILE_SECS = 3600 * 24
DAY_ENTRY_SECS = 3600 * 24
DAY_FILE_SECS = 3600 * 24 * 30
# @param name [String] Microservice name formatted as <SCOPE>__REDUCER__<TARGET>
# where <SCOPE> and <TARGET> are variables representing the scope name and target name
def initialize(name)
super(name, is_plugin: false)
@target_name = name.split('__')[-1]
@packet_logs = {}
end
def run
# Note it takes several seconds to create the scheduler
@scheduler = Rufus::Scheduler.new
# Run every minute
@scheduler.cron '* * * * *', first: :now do
reduce_minute
end
# Run every 15 minutes
@scheduler.cron '*/15 * * * *', first: :now do
reduce_hour
end
# Run hourly at minute 5 to allow the hour reducer to finish
@scheduler.cron '5 * * * *', first: :now do
reduce_day
end
# Let the current thread join the scheduler thread and
# block until shutdown is called
@scheduler.join
end
def shutdown
@scheduler.shutdown(wait: SHUTDOWN_DELAY_SECS) if @scheduler
# Make sure all the existing logs are properly closed down
@packet_logs.each do |name, log|
log.shutdown
end
super()
end
def metric(name)
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
yield
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float
@metric.add_sample(
name: name,
value: elapsed,
labels: {
'target' => @target_name,
},
)
end
def reduce_minute
metric(MINUTE_METRIC) do
ReducerModel
.all_files(type: :DECOM, target: @target_name, scope: @scope)
.each do |file|
process_file(file, 'minute', MINUTE_ENTRY_SECS, MINUTE_FILE_SECS)
ReducerModel.rm_file(file)
end
end
end
def reduce_hour
metric(HOUR_METRIC) do
ReducerModel
.all_files(type: :MINUTE, target: @target_name, scope: @scope)
.each do |file|
process_file(file, 'hour', HOUR_ENTRY_SECS, HOUR_FILE_SECS)
ReducerModel.rm_file(file)
end
end
end
def reduce_day
metric(DAY_METRIC) do
ReducerModel
.all_files(type: :HOUR, target: @target_name, scope: @scope)
.each do |file|
process_file(file, 'day', DAY_ENTRY_SECS, DAY_FILE_SECS)
ReducerModel.rm_file(file)
end
end
end
def process_file(filename, type, entry_seconds, file_seconds)
file = S3File.new(filename)
file.retrieve
# Determine if we already have a PacketLogWriter created
start_time, end_time, scope, target_name, packet_name, _ =
filename.split('__')
if @target_name != target_name
raise "Target name in file #{filename} does not match microservice target name #{@target_name}"
end
plw = @packet_logs["#{scope}__#{target_name}__#{packet_name}__#{type}"]
unless plw
# Create a new PacketLogWriter for this reduced data
# e.g. DEFAULT/reduced_minute_logs/tlm/INST/HEALTH_STATUS/20220101/
# 20220101204857274290500__20220101205857276524900__DEFAULT__INST__HEALTH_STATUS__reduced__minute.bin
remote_log_directory = "#{scope}/reduced_#{type}_logs/tlm/#{target_name}/#{packet_name}"
rt_label = "#{scope}__#{target_name}__#{packet_name}__reduced__#{type}"
plw = PacketLogWriter.new(remote_log_directory, rt_label)
@packet_logs["#{scope}__#{target_name}__#{packet_name}__#{type}"] = plw
end
reduced = {}
data_keys = nil
entry_time = nil
current_time = nil
previous_time = nil
plr = Cosmos::PacketLogReader.new
plr.each(file.local_path) do |packet|
# Ignore anything except numbers like STRING or BLOCK items
data = packet.read_all(:RAW).select { |key, value| value.is_a?(Numeric) }
converted_data = packet.read_all(:CONVERTED).select { |key, value| value.is_a?(Numeric) }
# Merge in the converted data which overwrites the raw
data.merge!(converted_data)
previous_time = current_time
current_time = packet.packet_time.to_f
entry_time ||= current_time
data_keys ||= data.keys
# Determine if we've rolled over a entry boundary
# We have to use current % entry_seconds < previous % entry_seconds because
# we don't know the data rates. We also have to check for current - previous >= entry_seconds
# in case the data rate is so slow we don't have multiple samples per entry
if previous_time &&
(
(current_time % entry_seconds < previous_time % entry_seconds) ||
(current_time - previous_time >= entry_seconds)
)
Logger.debug("Reducer: Roll over entry boundary cur_time:#{current_time}")
reduce(type, data_keys, reduced)
plw.write(
:JSON_PACKET,
:TLM,
target_name,
packet_name,
entry_time * Time::NSEC_PER_SECOND,
false,
JSON.generate(reduced.as_json),
)
# Reset all our sample variables
entry_time = current_time
reduced = {}
# Check to see if we should start a new log file
# We compare the current entry_time to see if it will push us over
if plw.first_time &&
(entry_time - plw.first_time.to_f) >= file_seconds
Logger.debug("Reducer: (1) start new file! old filename: #{plw.filename}")
plw.start_new_file # Automatically closes the current file
end
end
# Update statistics for this packet's values
data.each do |key, value|
if type == 'minute'
reduced["#{key}__VALS"] ||= []
reduced["#{key}__VALS"] << value
reduced["#{key}_MIN"] ||= value
reduced["#{key}_MIN"] = value if value < reduced["#{key}_MIN"]
reduced["#{key}_MAX"] ||= value
reduced["#{key}_MAX"] = value if value > reduced["#{key}_MAX"]
else
reduced[key] ||= value
reduced[key] = value if key.match(/_MIN$/) && value < reduced[key]
reduced[key] = value if key.match(/_MAX$/) && value > reduced[key]
if key.match(/_AVG$/)
reduced["#{key}__VALS"] ||= []
reduced["#{key}__VALS"] << value
end
if key.match(/_STDDEV$/)
reduced["#{key}__VALS"] ||= []
reduced["#{key}__VALS"] << value
end
if key.match(/_SAMPLES$/)
reduced["#{key}__VALS"] ||= []
reduced["#{key}__VALS"] << value
end
end
end
end
file.delete # Remove the local copy
# See if this last entry should go in a new file
if plw.first_time &&
(entry_time - plw.first_time.to_f) >= file_seconds
Logger.debug("Reducer: (2) start new file! old filename: #{plw.filename}")
plw.start_new_file # Automatically closes the current file
end
# Write out the final data now that the file is done
reduce(type, data_keys, reduced)
plw.write(
:JSON_PACKET,
:TLM,
target_name,
packet_name,
entry_time * Time::NSEC_PER_SECOND,
false,
JSON.generate(reduced.as_json),
)
true
rescue => e
if file.local_path and File.exist?(file.local_path)
Logger.error("Reducer Error: #{filename}:#{File.size(file.local_path)} bytes: \n#{e.formatted}")
else
Logger.error("Reducer Error: #{filename}:(Not Retrieved): \n#{e.formatted}")
end
false
end
def reduce(type, data_keys, reduced)
# We've collected all the values so calculate the AVG and STDDEV
if type == 'minute'
data_keys.each do |key|
reduced["#{key}_SAMPLES"] = reduced["#{key}__VALS"].length
reduced["#{key}_AVG"], reduced["#{key}_STDDEV"] =
Math.stddev_population(reduced["#{key}__VALS"])
# Remove the raw values as they're only used for AVG / STDDEV calculation
reduced.delete("#{key}__VALS")
end
else
# Sort so we calculate the average first, then samples, then stddev
data_keys.sort.each do |key|
base_name = key.split('_')[0..-2].join('_')
case key
when /_AVG$/
weighted_sum = 0
samples = reduced["#{base_name}_SAMPLES__VALS"]
reduced["#{key}__VALS"].each_with_index do |val, i|
weighted_sum += (val * samples[i])
end
reduced[key] = weighted_sum / samples.sum
when /_SAMPLES$/
reduced[key] = reduced["#{base_name}_SAMPLES__VALS"].sum
when /_STDDEV$/
# Do the STDDEV calc last so we can use the previously calculated AVG
# See https://math.stackexchange.com/questions/1547141/aggregating-standard-deviation-to-a-summary-point
samples = reduced["#{base_name}_SAMPLES__VALS"]
avg = reduced["#{base_name}_AVG__VALS"]
s2 = 0
reduced["#{key}__VALS"].each_with_index do |val, i|
# puts "i:#{i} val:#{val} samples[i]:#{samples[i]} avg[i]:#{avg[i]}"
s2 += (samples[i] * avg[i]**2 + val**2)
end
# Note: For very large numbers with very small deviations this sqrt can fail.
# If so then just set the stddev to 0.
begin
reduced[key] =
Math.sqrt(s2 / samples.sum - reduced["#{base_name}_AVG"])
rescue Exception
reduced[key] = 0.0
end
end
end
data_keys.each do |key|
# Remove the raw values as they're only used for AVG / STDDEV calculation
reduced.delete("#{key}__VALS")
end
end
end
end
end
Cosmos::ReducerMicroservice.run if __FILE__ == $0