cosmos/lib/cosmos/microservices/interface_microservice.rb
# encoding: ascii-8bit
# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This program may also be used under the terms of a commercial or
# enterprise edition license of COSMOS if purchased from the
# copyright holder
require 'cosmos/microservices/microservice'
require 'cosmos/models/interface_model'
require 'cosmos/models/router_model'
require 'cosmos/models/interface_status_model'
require 'cosmos/models/router_status_model'
require 'cosmos/topics/telemetry_topic'
require 'cosmos/topics/command_topic'
require 'cosmos/topics/command_decom_topic'
require 'cosmos/topics/interface_topic'
require 'cosmos/topics/router_topic'
module Cosmos
class InterfaceCmdHandlerThread
def initialize(interface, tlm, scope:)
@interface = interface
@tlm = tlm
@scope = scope
end
def start
@thread = Thread.new do
run()
rescue Exception => err
Logger.error "#{@interface.name}: Command handler thread died: #{err.formatted}"
raise err
end
end
def stop
Cosmos.kill_thread(self, @thread)
end
def graceful_kill
InterfaceTopic.shutdown(@interface, scope: @scope)
end
def run
InterfaceTopic.receive_commands(@interface, scope: @scope) do |topic, msg_hash|
# Check for a raw write to the interface
if topic =~ /CMD}INTERFACE/
if msg_hash['shutdown']
Logger.info "#{@interface.name}: Shutdown requested"
return
end
if msg_hash['connect']
Logger.info "#{@interface.name}: Connect requested"
@tlm.attempting()
next 'SUCCESS'
end
if msg_hash['disconnect']
Logger.info "#{@interface.name}: Disconnect requested"
@tlm.disconnect(false)
next 'SUCCESS'
end
if msg_hash['raw']
Logger.info "#{@interface.name}: Write raw"
# A raw interface write results in an UNKNOWN packet
command = System.commands.packet('UNKNOWN', 'UNKNOWN')
command.received_count += 1
command = command.clone
command.buffer = msg_hash['raw']
command.received_time = Time.now
CommandTopic.write_packet(command, scope: @scope)
@interface.write_raw(msg_hash['raw'])
next 'SUCCESS'
end
if msg_hash.key?('log_raw')
if msg_hash['log_raw'] == 'true'
Logger.info "#{@interface.name}: Enable raw logging"
@interface.start_raw_logging
else
Logger.info "#{@interface.name}: Disable raw logging"
@interface.stop_raw_logging
end
next 'SUCCESS'
end
end
target_name = msg_hash['target_name']
cmd_name = msg_hash['cmd_name']
cmd_params = nil
cmd_buffer = nil
hazardous_check = nil
if msg_hash['cmd_params']
cmd_params = JSON.parse(msg_hash['cmd_params'])
range_check = ConfigParser.handle_true_false(msg_hash['range_check'])
raw = ConfigParser.handle_true_false(msg_hash['raw'])
hazardous_check = ConfigParser.handle_true_false(msg_hash['hazardous_check'])
elsif msg_hash['cmd_buffer']
cmd_buffer = msg_hash['cmd_buffer']
end
begin
begin
if cmd_params
command = System.commands.build_cmd(target_name, cmd_name, cmd_params, range_check, raw)
elsif cmd_buffer
if target_name
command = System.commands.identify(cmd_buffer, [target_name])
else
command = System.commands.identify(cmd_buffer, @target_names)
end
unless command
command = System.commands.packet('UNKNOWN', 'UNKNOWN')
command.received_count += 1
command = command.clone
command.buffer = cmd_buffer
end
else
raise "Invalid command received:\n #{msg_hash}"
end
command.received_time = Time.now
rescue => e
Logger.error "#{@interface.name}: #{e.formatted}"
next e.message
end
if hazardous_check
hazardous, hazardous_description = System.commands.cmd_pkt_hazardous?(command)
# Return back the error, description, and the formatted command
# This allows the error handler to simply re-send the command
next "HazardousError\n#{hazardous_description}\n#{System.commands.format(command)}" if hazardous
end
begin
@interface.write(command)
CommandTopic.write_packet(command, scope: @scope)
CommandDecomTopic.write_packet(command, scope: @scope)
InterfaceStatusModel.set(@interface.as_json, scope: @scope)
next 'SUCCESS'
rescue => e
Logger.error "#{@interface.name}: #{e.formatted}"
next e.message
end
rescue => e
Logger.error "#{@interface.name}: #{e.formatted}"
next e.message
end
end
end
end
class RouterTlmHandlerThread
def initialize(router, tlm, scope:)
@router = router
@tlm = tlm
@scope = scope
end
def start
@thread = Thread.new do
run()
rescue Exception => err
Logger.error "#{@router.name}: Telemetry handler thread died: #{err.formatted}"
raise err
end
end
def stop
Cosmos.kill_thread(self, @thread)
end
def graceful_kill
RouterTopic.shutdown(@router, scope: @scope)
end
def run
RouterTopic.receive_telemetry(@router, scope: @scope) do |topic, msg_hash|
# Check for commands to the router itself
if /CMD}ROUTER/.match?(topic)
if msg_hash['shutdown']
Logger.info "#{@router.name}: Shutdown requested"
return
end
if msg_hash['connect']
Logger.info "#{@router.name}: Connect requested"
@tlm.attempting()
end
if msg_hash['disconnect']
Logger.info "#{@router.name}: Disconnect requested"
@tlm.disconnect(false)
end
if msg_hash.key?('log_raw')
if msg_hash['log_raw'] == 'true'
Logger.info "#{@router.name}: Enable raw logging"
@router.start_raw_logging
else
Logger.info "#{@router.name}: Disable raw logging"
@router.stop_raw_logging
end
end
next 'SUCCESS'
end
if @router.connected?
target_name = msg_hash["target_name"]
packet_name = msg_hash["packet_name"]
packet = System.telemetry.packet(target_name, packet_name)
packet.stored = ConfigParser.handle_true_false(msg_hash["stored"])
packet.received_time = Time.from_nsec_from_epoch(msg_hash["time"].to_i)
packet.received_count = msg_hash["received_count"].to_i
packet.buffer = msg_hash["buffer"]
begin
@router.write(packet)
RouterStatusModel.set(@router.as_json, scope: @scope)
next 'SUCCESS'
rescue => e
Logger.error "#{@router.name}: #{e.formatted}"
next e.message
end
end
end
end
end
class InterfaceMicroservice < Microservice
UNKNOWN_BYTES_TO_PRINT = 16
def initialize(name)
super(name)
@interface_or_router = self.class.name.to_s.split("Microservice")[0].upcase.split("::")[-1]
@scope = name.split("__")[0]
interface_name = name.split("__")[2]
if @interface_or_router == 'INTERFACE'
@interface = InterfaceModel.get_model(name: interface_name, scope: @scope).build
else
@interface = RouterModel.get_model(name: interface_name, scope: @scope).build
end
@interface.name = interface_name
# Map the interface to the interface's targets
@interface.target_names do |target_name|
target = System.targets[target_name]
target.interface = @interface
end
if @interface.connect_on_startup
@interface.state = 'ATTEMPTING'
else
@interface.state = 'DISCONNECTED'
end
if @interface_or_router == 'INTERFACE'
InterfaceStatusModel.set(@interface.as_json, scope: @scope)
else
RouterStatusModel.set(@interface.as_json, scope: @scope)
end
@interface_thread_sleeper = Sleeper.new
@cancel_thread = false
@connection_failed_messages = []
@connection_lost_messages = []
@mutex = Mutex.new
if @interface_or_router == 'INTERFACE'
@handler_thread = InterfaceCmdHandlerThread.new(@interface, self, scope: @scope)
else
@handler_thread = RouterTlmHandlerThread.new(@interface, self, scope: @scope)
end
@handler_thread.start
end
# External method to be called by the InterfaceCmdHandlerThread to connect
# Thus we just set the state and allow the run method to handle the action
def attempting
@interface.state = 'ATTEMPTING'
if @interface_or_router == 'INTERFACE'
InterfaceStatusModel.set(@interface.as_json, scope: @scope)
else
RouterStatusModel.set(@interface.as_json, scope: @scope)
end
end
def run
begin
if @interface.read_allowed?
Logger.info "#{@interface.name}: Starting packet reading"
else
Logger.info "#{@interface.name}: Starting connection maintenance"
end
while true
break if @cancel_thread
case @interface.state
when 'DISCONNECTED'
begin
# Just wait to see if we should connect later
@interface_thread_sleeper.sleep(1)
rescue Exception => err
break if @cancel_thread
end
when 'ATTEMPTING'
begin
@mutex.synchronize do
# We need to make sure connect is not called after stop() has been called
connect() unless @cancel_thread
end
rescue Exception => connect_error
handle_connection_failed(connect_error)
break if @cancel_thread
end
when 'CONNECTED'
if @interface.read_allowed?
begin
packet = @interface.read
if packet
handle_packet(packet)
@count += 1
else
Logger.info "#{@interface.name}: Internal disconnect requested (returned nil)"
handle_connection_lost()
break if @cancel_thread
end
rescue Exception => err
handle_connection_lost(err)
break if @cancel_thread
end
else
@interface_thread_sleeper.sleep(1)
handle_connection_lost() if !@interface.connected?
end
end
end
rescue Exception => error
Logger.error "#{@interface.name}: Packet reading thread died: #{error.formatted}"
Cosmos.handle_fatal_exception(error)
# Try to do clean disconnect because we're going down
disconnect(false)
end
if @interface_or_router == 'INTERFACE'
InterfaceStatusModel.set(@interface.as_json, scope: @scope)
else
RouterStatusModel.set(@interface.as_json, scope: @scope)
end
Logger.info "#{@interface.name}: Stopped packet reading"
end
def handle_packet(packet)
InterfaceStatusModel.set(@interface.as_json, scope: @scope)
packet.received_time = Time.now.sys unless packet.received_time
if packet.stored
# Stored telemetry does not update the current value table
identified_packet = System.telemetry.identify_and_define_packet(packet, @target_names)
else
# Identify and update packet
if packet.identified?
begin
# Preidentifed packet - place it into the current value table
identified_packet = System.telemetry.update!(packet.target_name,
packet.packet_name,
packet.buffer)
rescue RuntimeError
# Packet identified but we don't know about it
# Clear packet_name and target_name and try to identify
Logger.warn "#{@interface.name}: Received unknown identified telemetry: #{packet.target_name} #{packet.packet_name}"
packet.target_name = nil
packet.packet_name = nil
identified_packet = System.telemetry.identify!(packet.buffer,
@target_names)
end
else
# Packet needs to be identified
identified_packet = System.telemetry.identify!(packet.buffer,
@target_names)
end
end
if identified_packet
identified_packet.received_time = packet.received_time
identified_packet.stored = packet.stored
identified_packet.extra = packet.extra
packet = identified_packet
else
unknown_packet = System.telemetry.update!('UNKNOWN', 'UNKNOWN', packet.buffer)
unknown_packet.received_time = packet.received_time
unknown_packet.stored = packet.stored
unknown_packet.extra = packet.extra
packet = unknown_packet
json_hash = CvtModel.build_json_from_packet(packet)
CvtModel.set(json_hash, target_name: packet.target_name, packet_name: packet.packet_name, scope: scope)
num_bytes_to_print = [UNKNOWN_BYTES_TO_PRINT, packet.length].min
data = packet.buffer(false)[0..(num_bytes_to_print - 1)]
prefix = data.each_byte.map { | byte | sprintf("%02X", byte) }.join()
Logger.warn "#{@interface.name} #{packet.target_name} packet length: #{packet.length} starting with: #{prefix}"
end
# Write to stream
packet.received_count += 1
TelemetryTopic.write_packet(packet, scope: @scope)
end
def handle_connection_failed(connect_error)
@error = connect_error
Logger.error "#{@interface.name}: Connection Failed: #{connect_error.formatted(false, false)}"
case connect_error
when Interrupt
Logger.info "#{@interface.name}: Closing from signal"
@cancel_thread = true
when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ENOTSOCK, Errno::EHOSTUNREACH, IOError
# Do not write an exception file for these extremely common cases
else
if RuntimeError === connect_error and (connect_error.message =~ /canceled/ or connect_error.message =~ /timeout/)
# Do not write an exception file for these extremely common cases
else
Logger.error "#{@interface.name}: #{connect_error.formatted}"
unless @connection_failed_messages.include?(connect_error.message)
Cosmos.write_exception_file(connect_error)
@connection_failed_messages << connect_error.message
end
end
end
disconnect() # Ensure we do a clean disconnect
end
def handle_connection_lost(err = nil, reconnect: true)
if err
@error = err
Logger.info "#{@interface.name}: Connection Lost: #{err.formatted(false, false)}"
case err
when Interrupt
Logger.info "#{@interface.name}: Closing from signal"
@cancel_thread = true
when Errno::ECONNABORTED, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::EBADF, Errno::ENOTSOCK, IOError
# Do not write an exception file for these extremely common cases
else
Logger.error "#{@interface.name}: #{err.formatted}"
unless @connection_lost_messages.include?(err.message)
Cosmos.write_exception_file(err)
@connection_lost_messages << err.message
end
end
else
Logger.info "#{@interface.name}: Connection Lost"
end
disconnect(reconnect) # Ensure we do a clean disconnect
end
def connect
Logger.info "#{@interface.name}: Connecting ..."
@interface.connect
@interface.state = 'CONNECTED'
if @interface_or_router == 'INTERFACE'
InterfaceStatusModel.set(@interface.as_json, scope: @scope)
else
RouterStatusModel.set(@interface.as_json, scope: @scope)
end
Logger.info "#{@interface.name}: Connection Success"
end
def disconnect(allow_reconnect = true)
return if @interface.state == 'DISCONNECTED' && !@interface.connected?
# Synchronize the calls to @interface.disconnect since it takes an unknown
# amount of time. If two calls to disconnect stack up, the if statement
# should avoid multiple calls to disconnect.
@mutex.synchronize do
begin
@interface.disconnect if @interface.connected?
rescue => e
Logger.error "Disconnect: #{@interface.name}: #{e.formatted}"
end
end
# If the interface is set to auto_reconnect then delay so the thread
# can come back around and allow the interface a chance to reconnect.
if allow_reconnect and @interface.auto_reconnect and @interface.state != 'DISCONNECTED'
attempting()
if !@cancel_thread
# Logger.debug "reconnect delay: #{@interface.reconnect_delay}"
@interface_thread_sleeper.sleep(@interface.reconnect_delay)
end
else
@interface.state = 'DISCONNECTED'
if @interface_or_router == 'INTERFACE'
InterfaceStatusModel.set(@interface.as_json, scope: @scope)
else
RouterStatusModel.set(@interface.as_json, scope: @scope)
end
end
end
# Disconnect from the interface and stop the thread
def stop
Logger.info "#{@interface.name}: stop requested"
@mutex.synchronize do
# Need to make sure that @cancel_thread is set and the interface disconnected within
# mutex to ensure that connect() is not called when we want to stop()
@cancel_thread = true
@handler_thread.stop
@interface_thread_sleeper.cancel
@interface.disconnect
if @interface_or_router == 'INTERFACE'
valid_interface = InterfaceStatusModel.get_model(name: @interface.name, scope: @scope)
else
valid_interface = RouterStatusModel.get_model(name: @interface.name, scope: @scope)
end
valid_interface.destroy if valid_interface
end
end
def shutdown(sig = nil)
Logger.info "#{@interface.name}: shutdown requested"
stop()
super()
end
def graceful_kill
# Just to avoid warning
end
end
end
Cosmos::InterfaceMicroservice.run if __FILE__ == $0