cosmos-cmd-tlm-api/app/models/streaming_api.rb
# encoding: ascii-8bit
# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder
# TODO : Handoff to realtime thread
require 'base64'
require 'cosmos'
Cosmos.require_file 'cosmos/packets/packet'
Cosmos.require_file 'cosmos/utilities/store'
Cosmos.require_file 'cosmos/utilities/s3_file_cache'
Cosmos.require_file 'cosmos/packets/json_packet'
Cosmos.require_file 'cosmos/logs/packet_log_reader'
Cosmos.require_file 'cosmos/utilities/authorization'
class StreamingThread
def initialize(channel, collection, stream_mode, max_batch_size = 100)
# Cosmos::Logger.level = Cosmos::Logger::DEBUG
@channel = channel
@collection = collection
@max_batch_size = max_batch_size
@cancel_thread = false
@thread = nil
@stream_mode = stream_mode
end
def start
@thread = Thread.new do
while true
break if @cancel_thread
thread_body()
break if @cancel_thread
end
rescue => err
Cosmos::Logger.error "#{self.class.name} unexpectedly died\n#{err.formatted}"
end
end
def alive?
if @thread
@thread.alive?
else
false
end
end
def thread_body
raise "Must be defined by subclasses"
end
def stop
@cancel_thread = true
end
def transmit_results(results, force: false)
if results.length > 0 or force
# Fortify: This send is intentionally bypassing access control to get to the
# private transmit method
@channel.send(:transmit, JSON.generate(results.as_json))
end
end
def redis_thread_body(topics, offsets, objects_by_topic)
# Cosmos::Logger.debug "#{self.class} redis_thread_body topics:#{topics} offsets:#{offsets} objects:#{objects_by_topic}"
results = []
if topics.length > 0
rtr = Cosmos::Topic.read_topics(topics, offsets) do |topic, msg_id, msg_hash, redis|
# Cosmos::Logger.debug "read_topics topic:#{topic} offsets:#{offsets} id:#{msg_id} msg time:#{msg_hash['time']}"
objects = objects_by_topic[topic]
objects.each do |object|
object.offset = msg_id
end
results_by_value_type = []
value_types = objects.group_by { |object| object.value_type }
value_types.each_value do |value|
results_by_value_type << handle_message(topic, msg_id, msg_hash, redis, value)
end
results_by_value_type.compact!
if results_by_value_type.length > 0
results.concat(results_by_value_type)
else
break results
end
if results.length > @max_batch_size
transmit_results(results)
results.clear
end
break results if @cancel_thread
results
end
# If we're no longer grabbing packets from the stream (empty result)
# we check to see if we need to continue
# Cosmos::Logger.debug "rtr:#{rtr} empty?:#{rtr.empty?} results:#{results} topics:#{topics} offsets:#{offsets}"
if rtr.nil? or rtr.empty?
topics.each do |topic|
objects = objects_by_topic[topic]
objects.each do |object|
keys = []
# If time has passed the end_time and we're still not getting anything we're done
if object.end_time and object.end_time < Time.now.to_nsec_from_epoch
keys << object.key
@cancel_thread = true
end
@collection.remove(keys)
end
end
end
transmit_results(results, force: @collection.empty?)
transmit_results([], force: true) if !results.empty? and @collection.empty?
else
sleep(1)
end
end
def handle_message(topic, msg_id, msg_hash, redis, objects)
topic_without_hashtag = topic.gsub(/{|}/, '') # This removes all curly braces, and we don't allow curly braces in our keys
first_object = objects[0]
time = msg_hash['time'].to_i
if @stream_mode == :RAW
return handle_raw_packet(msg_hash['buffer'], objects, time, topic_without_hashtag)
else # @stream_mode == :DECOM
json_packet = Cosmos::JsonPacket.new(first_object.cmd_or_tlm, first_object.target_name, first_object.packet_name,
time, Cosmos::ConfigParser.handle_true_false(msg_hash["stored"]), msg_hash["json_data"])
return handle_json_packet(json_packet, objects, topic_without_hashtag)
end
end
def handle_json_packet(json_packet, objects, topic)
time = json_packet.packet_time
keys_remain = objects_active?(objects, time.to_nsec_from_epoch)
return nil unless keys_remain
result = {}
objects.each do |object|
# Cosmos::Logger.debug("item:#{object.item_name} key:#{object.key} type:#{object.value_type}")
if object.item_name
result[object.key] = json_packet.read(object.item_name, object.value_type)
else # whole packet
this_packet = json_packet.read_all(object.value_type)
result = result.merge(this_packet)
result['packet'] = topic + "__" + object.value_type.to_s
end
end
result['time'] = time.to_nsec_from_epoch
return result
end
def handle_raw_packet(buffer, objects, time, topic)
keys_remain = objects_active?(objects, time)
return nil unless keys_remain
return {
packet: topic,
buffer: Base64.encode64(buffer),
time: time
}
end
def objects_active?(objects, time)
first_object = objects[0]
if first_object.end_time and time > first_object.end_time
# These objects have expired and are removed from the collection
keys = []
objects.each do |object|
keys << object.key
end
@collection.remove(keys)
return false
end
return true
end
end
class LoggedStreamingThread < StreamingThread
ALLOWABLE_START_TIME_OFFSET_NSEC = 60 * Time::NSEC_PER_SECOND
def initialize(thread_id, channel, collection, stream_mode, max_batch_size = 100, scope:)
super(channel, collection, stream_mode, max_batch_size)
@thread_id = thread_id
@thread_mode = :SETUP
# Reduced has no Redis streams so go direct to file
@thread_mode = :FILE if stream_mode.to_s.upcase.include?("REDUCED")
@scope = scope
end
def thread_body
objects = @collection.objects_by_thread_id[@thread_id]
# Cancel if we don't have any objects ... this can happen as things are processed
# or if someone calls remove() from the StreamingApi
@cancel_thread = true unless objects and objects.length > 0
return if @cancel_thread
first_object = objects[0]
if @thread_mode == :SETUP
# Get the newest message because we only stream if there is data after our start time
_, msg_hash_new = Cosmos::Topic.get_newest_message(first_object.topic)
# Cosmos::Logger.debug "first time:#{first_object.start_time} newest:#{msg_hash_new['time']}"
# Allow 1 minute in the future to account for big time discrepancies, which may be caused by:
# - the JavaScript client using the machine's local time, which might not be set with NTP
# - browser security settings rounding the value within a few milliseconds
allowable_start_time = first_object.start_time - ALLOWABLE_START_TIME_OFFSET_NSEC
if msg_hash_new && msg_hash_new['time'].to_i > allowable_start_time
# Determine oldest timestamp in stream to determine if we need to go to file
msg_id, msg_hash = Cosmos::Topic.get_oldest_message(first_object.topic)
oldest_time = msg_hash['time'].to_i
# Cosmos::Logger.debug "first start time:#{first_object.start_time} oldest:#{oldest_time}"
if first_object.start_time < oldest_time
# Stream from Files
@thread_mode = :FILE
else
# Stream from Redis
# Guesstimate start offset in stream based on first packet time and redis time
redis_time = msg_id.split('-')[0].to_i * 1_000_000
delta = redis_time - oldest_time
# Start streaming from calculated redis time
offset = ((first_object.start_time + delta) / 1_000_000).to_s + '-0'
# Cosmos::Logger.debug "stream from Redis offset:#{offset} redis_time:#{redis_time} delta:#{delta}"
objects.each {|object| object.offset = offset}
@thread_mode = :STREAM
end
else
# Since we're not going to transmit anything cancel and transmit an empty result
# Cosmos::Logger.debug "NO DATA DONE! transmit 0 results"
@cancel_thread = true
transmit_results([], force: true)
end
elsif @thread_mode == :STREAM
objects_by_topic = { objects[0].topic => objects }
redis_thread_body([first_object.topic], [first_object.offset], objects_by_topic)
else # @thread_mode == :FILE
# Get next file from file cache
file_end_time = first_object.end_time
file_end_time = Time.now.to_nsec_from_epoch unless file_end_time
file_path = S3FileCache.instance.reserve_file(first_object.cmd_or_tlm, first_object.target_name, first_object.packet_name,
first_object.start_time, file_end_time, @stream_mode, scope: @scope) # TODO: look at how @stream_mode is being used
if file_path
file_path_split = File.basename(file_path).split("__")
file_end_time = DateTime.strptime(file_path_split[1], S3FileCache::TIMESTAMP_FORMAT).to_f * Time::NSEC_PER_SECOND # TODO: get format from different class' constant?
# Scan forward to find first packet needed
# Stream forward until packet > end_time or no more packets
results = []
plr = Cosmos::PacketLogReader.new()
topic_without_hashtag = first_object.topic.gsub(/{|}/, '') # This removes all curly braces, and we don't allow curly braces in our keys
done = plr.each(file_path, false, Time.from_nsec_from_epoch(first_object.start_time), Time.from_nsec_from_epoch(first_object.end_time)) do |packet|
time = packet.received_time if packet.respond_to? :received_time
time ||= packet.packet_time
result = nil
if @stream_mode == :RAW
result = handle_raw_packet(packet.buffer, objects, time.to_nsec_from_epoch, topic_without_hashtag)
else # @stream_mode == :DECOM
result = handle_json_packet(packet, objects, topic_without_hashtag)
end
if result
results << result
else
break
end
if results.length > @max_batch_size
transmit_results(results)
results.clear
end
break if @cancel_thread
end
transmit_results(results)
@last_file_redis_offset = plr.redis_offset
# Move to the next file
S3FileCache.instance.unreserve_file(file_path)
objects.each {|object| object.start_time = file_end_time}
if done # We reached the end time
@cancel_thread = true
transmit_results([], force: true)
end
else
Cosmos::Logger.info "Switch stream from file to Redis"
# TODO: What if there is no new data in the Redis stream?
# Switch to stream from Redis
# Determine oldest timestamp in stream
msg_id, msg_hash = Cosmos::Topic.get_oldest_message(first_object.topic)
if msg_hash
oldest_time = msg_hash['time'].to_i
# Stream from Redis
offset = @last_file_redis_offset if @last_file_redis_offset
if !offset
# Guesstimate start offset in stream based on first packet time and redis time
redis_time = msg_id.split('-')[0].to_i * 1000000
delta = redis_time - oldest_time
# Start streaming from calculated redis time
offset = ((first_object.start_time + delta) / 1_000_000).to_s + '-0'
end
Cosmos::Logger.debug "Oldest Redis id:#{msg_id} msg time:#{oldest_time} last object time:#{first_object.start_time} offset:#{offset}"
objects.each {|object| object.offset = offset}
@thread_mode = :STREAM
else
@cancel_thread = true
end
end
end
# Transfers item to realtime thread when complete (if continued)
# Needs to mutex transfer
# checks if equal offset if packet already exists in realtime
# if doesn't exist adds with item offset
# if does exist and equal - transfer
# if does exist and less than - add item with less offset
# if does exist and greater than - catch up and try again
end
end
class RealtimeStreamingThread < StreamingThread
def thread_body
topics, offsets, objects_by_topic = @collection.realtime_topics_offsets_and_objects
redis_thread_body(topics, offsets, objects_by_topic)
end
end
class StreamingApi
include Cosmos::Authorization
# Helper class to store information about the streaming item
class StreamingObject
include Cosmos::Authorization
attr_accessor :key
attr_accessor :cmd_or_tlm
attr_accessor :target_name
attr_accessor :packet_name
attr_accessor :item_name
attr_accessor :value_type
attr_accessor :start_time
attr_accessor :end_time
attr_accessor :offset
attr_accessor :topic
attr_accessor :thread_id
def initialize(key, start_time, end_time, thread_id = nil, stream_mode:, scope:, token: nil)
@key = key
key_split = key.split('__')
@cmd_or_tlm = key_split[0].to_s.intern
@scope = scope
@target_name = key_split[1]
@packet_name = key_split[2]
type = nil
if stream_mode == :RAW
# value_type is implied to be :RAW and this must be a whole packet
@value_type = :RAW
type = (@cmd_or_tlm == :CMD) ? 'COMMAND' : 'TELEMETRY'
elsif stream_mode == :DECOM
type = (@cmd_or_tlm == :CMD) ? 'DECOMCMD' : 'DECOM'
# If our value type is the 4th param we're streaming a packet, otherwise item
if Cosmos::Packet::VALUE_TYPES.include?(key_split[3].intern)
@value_type = key_split[3].intern
else
@item_name = key_split[3]
@value_type = key_split[4].intern
end
else # Reduced
type = stream_mode
# Reduced items are passed as TGT__PKT__ITEM_REDUCETYPE__VALUETYPE
# e.g. INST__HEALTH_STATUS__TEMP1_AVG__CONVERTED
# Note there is NOT a double underscore between item name and reduce type
@item_name = key_split[3]
@value_type = key_split[4].intern
end
@start_time = start_time
@end_time = end_time
authorize(permission: @cmd_or_tlm.to_s.downcase, target_name: @target_name, packet_name: @packet_name, scope: scope, token: token)
@topic = "#{@scope}__#{type}__{#{@target_name}}__#{@packet_name}"
@offset = nil
@offset = Cosmos::Topic.get_last_offset(@topic) unless @start_time
Cosmos::Logger.info("Streaming from #{@topic} start:#{@start_time} end:#{@end_time} offset:#{@offset}")
@thread_id = thread_id
end
end
# Helper class to collect StreamingObjects and map them to threads
class StreamingObjectCollection
attr_reader :objects_by_thread_id
def initialize
@objects_by_key = {}
@objects_by_thread_id = {}
@objects_by_thread_id[nil] = []
@mutex = Mutex.new
end
def add(objects)
@mutex.synchronize do
objects.each do |object|
existing_object = @objects_by_key[object.key]
if existing_object
@objects_by_thread_id[existing_object.thread_id].delete(existing_object)
end
@objects_by_key[object.key] = object
@objects_by_thread_id[object.thread_id] ||= []
@objects_by_thread_id[object.thread_id] << object
end
end
end
def remove(keys)
@mutex.synchronize do
keys.each do |key|
object = @objects_by_key[key]
if object
@objects_by_key.delete(key)
@objects_by_thread_id[object.thread_id].delete(object)
end
end
end
end
def realtime_topics_offsets_and_objects
topics_and_offsets = {}
objects_by_topic = {}
@mutex.synchronize do
@objects_by_thread_id[nil].each do |object|
if object.start_time == nil
offset = topics_and_offsets[object.topic]
topics_and_offsets[object.topic] = object.offset if !offset or object.offset < offset
objects_by_topic[object.topic] ||= []
objects_by_topic[object.topic] << object
end
end
end
return topics_and_offsets.keys, topics_and_offsets.values, objects_by_topic
end
def length
return @objects_by_key.length
end
def empty?
length() == 0
end
end
def initialize(uuid, channel, scope: nil, token: nil)
authorize(permission: 'tlm', scope: scope, token: token)
@thread_id = 1
@uuid = uuid
@channel = channel
@mutex = Mutex.new
@collection = StreamingObjectCollection.new
@realtime_thread = nil
@logged_threads = []
# Cosmos::Logger.level = Cosmos::Logger::DEBUG
end
def add(data)
# Cosmos::Logger.debug "start:#{Time.at(data["start_time"].to_i/1_000_000_000.0).formatted}" if data["start_time"]
# Cosmos::Logger.debug "end:#{Time.at(data["end_time"].to_i/1_000_000_000.0).formatted}" if data["end_time"]
@mutex.synchronize do
start_time = nil
start_time = data["start_time"].to_i if data["start_time"]
end_time = nil
end_time = data["end_time"].to_i if data["end_time"]
stream_mode = data["mode"].to_s.intern
scope = data["scope"]
token = data["token"]
keys = []
keys.concat(data["items"]) if data["items"]
keys.concat(data["packets"]) if data["packets"]
objects = []
objects_by_topic = {}
keys.each do |key|
object = StreamingObject.new(key, start_time, end_time, stream_mode: stream_mode, scope: scope, token: token)
objects_by_topic[object.topic] ||= []
objects_by_topic[object.topic] << object
objects << object
end
if start_time
objects_by_topic.each do |topic, objects|
# Cosmos::Logger.debug "topic:#{topic} objs:#{objects} mode:#{stream_mode}"
objects.each {|object| object.thread_id = @thread_id}
thread = LoggedStreamingThread.new(@thread_id, @channel, @collection, stream_mode, scope: scope)
thread.start
@logged_threads << thread
@thread_id += 1
end
elsif end_time.nil? or end_time > Time.now.to_nsec_from_epoch
# Create a single realtime streaming thread to use the entire collection
if @realtime_thread.nil?
@realtime_thread = RealtimeStreamingThread.new(@channel, @collection, stream_mode)
@realtime_thread.start
end
end
@collection.add(objects)
end
end
def remove(data)
keys = []
keys.concat(data["items"]) if data["items"]
keys.concat(data["packets"]) if data["packets"]
@collection.remove(keys)
end
def kill
threads = []
if @realtime_thread
@realtime_thread.stop
threads << @realtime_thread
end
@logged_threads.each do |thread|
thread.stop
threads << thread
end
# Allow the threads a chance to stop (1.1s each)
threads.each do |thread|
i = 0
while thread.alive? or i < 110 do
sleep 0.01
i += 1
end
end
# Ok we tried, now initialize everything
@realtime_thread = nil
@logged_threads = []
end
end